You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/29 18:49:40 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #638: add integration tests for rank, dense_rank, fix last_value evaluation with rank

alamb commented on a change in pull request #638:
URL: https://github.com/apache/arrow-datafusion/pull/638#discussion_r660878629



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -138,21 +140,56 @@ pub(crate) struct NthValueEvaluator {
 }
 
 impl PartitionEvaluator for NthValueEvaluator {
-    fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
-        let value = &self.values[0];
+    fn include_rank(&self) -> bool {
+        true
+    }
+
+    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
+        unreachable!("first, last, and nth_value evaluation must be called with evaluate_partition_with_rank")
+    }
+
+    fn evaluate_partition_with_rank(
+        &self,
+        partition: Range<usize>,
+        ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        let arr = &self.values[0];
         let num_rows = partition.end - partition.start;
-        let value = value.slice(partition.start, num_rows);
-        let index: usize = match self.kind {
-            NthValueKind::First => 0,
-            NthValueKind::Last => (num_rows as usize) - 1,
-            NthValueKind::Nth(n) => (n as usize) - 1,
-        };
-        Ok(if index >= num_rows {
-            new_null_array(value.data_type(), num_rows)
-        } else {
-            let value = ScalarValue::try_from_array(&value, index)?;
-            value.to_array_of_size(num_rows)
-        })
+        match self.kind {
+            NthValueKind::First => {
+                let value = ScalarValue::try_from_array(arr, partition.start)?;
+                Ok(value.to_array_of_size(num_rows))
+            }
+            NthValueKind::Last => {
+                // because the default window frame is between unbounded preceding and current
+                // row with peer evaluation, hence the last rows expands until the end of the peers
+                let values = ranks_in_partition
+                    .iter()
+                    .map(|range| {
+                        let len = range.end - range.start;
+                        let value = ScalarValue::try_from_array(arr, range.end - 1)?;
+                        Ok(iter::repeat(value).take(len))
+                    })
+                    .collect::<Result<Vec<_>>>()?
+                    .into_iter()
+                    .flatten();
+                ScalarValue::iter_to_array(values)
+            }
+            NthValueKind::Nth(n) => {
+                let index = (n as usize) - 1;
+                if index >= num_rows {
+                    Ok(new_null_array(arr.data_type(), num_rows))
+                } else {
+                    let value =
+                        ScalarValue::try_from_array(arr, partition.start + index)?;
+                    let arr = value.to_array_of_size(num_rows);
+                    // because the default window frame is between unbounded preceding and current
+                    // row, hence the shift because for values with indices < index they should be
+                    // null. This changes when window frames other than default is implemented
+                    shift(arr.as_ref(), index as i64).map_err(DataFusionError::ArrowError)

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -138,21 +140,56 @@ pub(crate) struct NthValueEvaluator {
 }
 
 impl PartitionEvaluator for NthValueEvaluator {
-    fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
-        let value = &self.values[0];
+    fn include_rank(&self) -> bool {
+        true
+    }
+
+    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
+        unreachable!("first, last, and nth_value evaluation must be called with evaluate_partition_with_rank")
+    }
+
+    fn evaluate_partition_with_rank(
+        &self,
+        partition: Range<usize>,
+        ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        let arr = &self.values[0];
         let num_rows = partition.end - partition.start;
-        let value = value.slice(partition.start, num_rows);
-        let index: usize = match self.kind {
-            NthValueKind::First => 0,
-            NthValueKind::Last => (num_rows as usize) - 1,
-            NthValueKind::Nth(n) => (n as usize) - 1,
-        };
-        Ok(if index >= num_rows {
-            new_null_array(value.data_type(), num_rows)
-        } else {
-            let value = ScalarValue::try_from_array(&value, index)?;
-            value.to_array_of_size(num_rows)
-        })
+        match self.kind {
+            NthValueKind::First => {
+                let value = ScalarValue::try_from_array(arr, partition.start)?;
+                Ok(value.to_array_of_size(num_rows))
+            }
+            NthValueKind::Last => {
+                // because the default window frame is between unbounded preceding and current
+                // row with peer evaluation, hence the last rows expands until the end of the peers
+                let values = ranks_in_partition
+                    .iter()
+                    .map(|range| {
+                        let len = range.end - range.start;
+                        let value = ScalarValue::try_from_array(arr, range.end - 1)?;

Review comment:
       this is very cool




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org