You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/07 06:48:25 UTC

[GitHub] [arrow-datafusion] Jimexist opened a new pull request #520: Impl window order by

Jimexist opened a new pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist edited a comment on pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist edited a comment on pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#issuecomment-861088442


   > This looks very nice @Jimexist -- I went over the code and saw only goodness :)
   > 
   > All that this PR needs to be mergeable in my opinion is to reset the Cargo `arrow*` references (now that arrow 4.3.0 has been released)
   
   thank you for taking time to review. the changes to arrow references are now reverted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651360294



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -113,54 +111,32 @@ impl BuiltInWindowFunctionExpr for NthValue {
         &self.name
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        Ok(Box::new(NthValueAccumulator::try_new(
-            self.kind,
-            self.data_type.clone(),
-        )?))
-    }
-}
-
-#[derive(Debug)]
-struct NthValueAccumulator {
-    kind: NthValueKind,
-    offset: u32,
-    value: ScalarValue,
-}
-
-impl NthValueAccumulator {
-    /// new count accumulator
-    pub fn try_new(kind: NthValueKind, data_type: DataType) -> Result<Self> {
-        Ok(Self {
-            kind,
-            offset: 0,
-            // null value of that data_type by default
-            value: ScalarValue::try_from(&data_type)?,
-        })
-    }
-}
-
-impl WindowAccumulator for NthValueAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.offset += 1;
-        match self.kind {
-            NthValueKind::Last => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::First if self.offset == 1 => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::Nth(n) if self.offset == n => {
-                self.value = values[0].clone();
-            }
-            _ => {}
+    fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result<ArrayRef> {
+        if values.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No arguments supplied to {}",
+                self.name()
+            )));
         }
-
-        Ok(None)
-    }
-
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.value.clone()))
+        let value = &values[0];
+        if value.len() != num_rows {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid data supplied to {}, expect {} rows, got {} rows",
+                self.name(),
+                num_rows,
+                value.len()
+            )));
+        }
+        if num_rows == 0 {

Review comment:
       but you are right this would _not_ be passed with 0 length input. this check is just being pedantic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651358846



##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -156,31 +162,72 @@ impl WindowExpr for BuiltInWindowExpr {
         self.window.expressions()
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        self.window.create_accumulator()
+    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_by
+    }
+
+    fn order_by(&self) -> &[PhysicalSortExpr] {
+        &self.order_by
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        // FIXME, for now we assume all the rows belong to the same partition, which will not be the
+        // case when partition_by is supported, in which case we'll parallelize the calls.
+        // See https://github.com/apache/arrow-datafusion/issues/299
+        let values = self.evaluate_args(batch)?;
+        self.window.evaluate(batch.num_rows(), &values)
     }
 }
 
 /// A window expr that takes the form of an aggregate function
 #[derive(Debug)]
 pub struct AggregateWindowExpr {
     aggregate: Arc<dyn AggregateExpr>,
+    partition_by: Vec<Arc<dyn PhysicalExpr>>,
+    order_by: Vec<PhysicalSortExpr>,
+    window_frame: Option<WindowFrame>,
 }
 
-#[derive(Debug)]
-struct AggregateWindowAccumulator {
-    accumulator: Box<dyn Accumulator>,
-}
+impl AggregateWindowExpr {
+    /// the aggregate window function operates based on window frame, and by default the mode is
+    /// "range".
+    fn evaluation_mode(&self) -> WindowFrameUnits {
+        self.window_frame.unwrap_or_default().units
+    }
 
-impl WindowAccumulator for AggregateWindowAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.accumulator.update(values)?;
-        Ok(None)
+    /// create a new accumulator based on the underlying aggregation function
+    fn create_accumulator(&self) -> Result<AggregateWindowAccumulator> {
+        let accumulator = self.aggregate.create_accumulator()?;
+        Ok(AggregateWindowAccumulator { accumulator })
     }
 
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.accumulator.evaluate()?))
+    /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
+    /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
+    /// results for peers) and concatenate the results.
+    fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {

Review comment:
       i will possibly change this naming in implementing #361 but for the moment, `range` and `groups` both evaluates with peers but `rows` evaluates based on rows on each `scan`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist commented on pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#issuecomment-861088442


   > This looks very nice @Jimexist -- I went over the code and saw only goodness :)
   > 
   > All that this PR needs to be mergeable in my opinion is to reset the Cargo `arrow*` references (now that arrow 4.3.0 has been released)
   
   thank you for taking time to review. the changes to arrow are now reverted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651068287



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -113,54 +111,32 @@ impl BuiltInWindowFunctionExpr for NthValue {
         &self.name
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        Ok(Box::new(NthValueAccumulator::try_new(
-            self.kind,
-            self.data_type.clone(),
-        )?))
-    }
-}
-
-#[derive(Debug)]
-struct NthValueAccumulator {
-    kind: NthValueKind,
-    offset: u32,
-    value: ScalarValue,
-}
-
-impl NthValueAccumulator {
-    /// new count accumulator
-    pub fn try_new(kind: NthValueKind, data_type: DataType) -> Result<Self> {
-        Ok(Self {
-            kind,
-            offset: 0,
-            // null value of that data_type by default
-            value: ScalarValue::try_from(&data_type)?,
-        })
-    }
-}
-
-impl WindowAccumulator for NthValueAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.offset += 1;
-        match self.kind {
-            NthValueKind::Last => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::First if self.offset == 1 => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::Nth(n) if self.offset == n => {
-                self.value = values[0].clone();
-            }
-            _ => {}
+    fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result<ArrayRef> {
+        if values.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No arguments supplied to {}",
+                self.name()
+            )));
         }
-
-        Ok(None)
-    }
-
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.value.clone()))
+        let value = &values[0];
+        if value.len() != num_rows {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid data supplied to {}, expect {} rows, got {} rows",
+                self.name(),
+                num_rows,
+                value.len()
+            )));
+        }
+        if num_rows == 0 {

Review comment:
       Could this function could ever be passed a 0 row input? This check isn't a problem I am just wondering if my mental model is correct. 

##########
File path: integration-tests/sqls/simple_window_ordered_aggregation.sql
##########
@@ -0,0 +1,26 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+
+-- http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language gOVERning permissions and
+-- limitations under the License.
+
+SELECT

Review comment:
       Very nice πŸ‘ 

##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -1083,15 +1083,15 @@ fn dictionary_create_group_by_value<K: ArrowDictionaryKeyType>(
     let dict_col = col.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
 
     // look up the index in the values dictionary
-    let keys_col = dict_col.keys_array();
+    let keys_col = dict_col.keys();

Review comment:
       This change in API was not published with arrow 4.3 (it will be in arrow 5.0) but it is cool to leave the changes in this PR anyways πŸ‘ 

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -202,9 +249,55 @@ impl WindowExpr for AggregateWindowExpr {
         self.aggregate.expressions()
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        let accumulator = self.aggregate.create_accumulator()?;
-        Ok(Box::new(AggregateWindowAccumulator { accumulator }))
+    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_by
+    }
+
+    fn order_by(&self) -> &[PhysicalSortExpr] {
+        &self.order_by
+    }
+
+    /// evaluate the window function values against the batch
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        // FIXME, for now we assume all the rows belong to the same partition, which will not be the
+        // case when partition_by is supported, in which case we'll parallelize the calls.
+        // See https://github.com/apache/arrow-datafusion/issues/299
+        match self.evaluation_mode() {
+            WindowFrameUnits::Range => self.peer_based_evaluate(batch),
+            WindowFrameUnits::Rows => self.row_based_evaluate(batch),
+            WindowFrameUnits::Groups => self.group_based_evaluate(batch),
+        }
+    }
+}
+
+/// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum
+/// across evaluation arguments based on peer equivalences.
+#[derive(Debug)]
+struct AggregateWindowAccumulator {
+    accumulator: Box<dyn Accumulator>,
+}
+
+impl AggregateWindowAccumulator {
+    /// scan one peer group of values (as arguments to window function) given by the value_range
+    /// and return evaluation result that are of the same number of rows.
+    fn scan_peers(
+        &mut self,
+        values: &[ArrayRef],
+        value_range: &Range<usize>,
+    ) -> Result<ArrayRef> {
+        if value_range.is_empty() {
+            return Err(DataFusionError::Internal(
+                "Value range cannot be empty".to_owned(),
+            ));
+        }
+        let len = value_range.end - value_range.start;
+        let values = values
+            .iter()
+            .map(|v| v.slice(value_range.start, len))

Review comment:
       πŸ‘ 

##########
File path: datafusion/src/physical_plan/window_functions.rs
##########
@@ -207,7 +208,10 @@ pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
     }
 }
 
-/// A window expression that is a built-in window function
+/// A window expression that is a built-in window function.
+///
+/// Note that unlike aggregation based window functions, built-in window functions normally ignore
+/// window frame spec, with th expression of first_value, last_value, and nth_value.

Review comment:
       ```suggestion
   /// window frame spec, with the exception of first_value, last_value, and nth_value.
   ```

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1299,6 +1309,49 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn window_order_by() -> Result<()> {
+        let results = execute(
+            "SELECT \
+            c1, \
+            c2, \
+            ROW_NUMBER() OVER (ORDER BY c1, c2), \
+            FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
+            SUM(c2) OVER (ORDER BY c1, c2), \
+            COUNT(c2) OVER (ORDER BY c1, c2), \
+            MAX(c2) OVER (ORDER BY c1, c2), \
+            MIN(c2) OVER (ORDER BY c1, c2), \
+            AVG(c2) OVER (ORDER BY c1, c2) \
+            FROM test \
+            ORDER BY c1, c2 \
+            LIMIT 5",
+            4,
+        )
+        .await?;
+        // result in one batch, although e.g. having 2 batches do not change

Review comment:
       πŸ‘ 

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -113,54 +111,32 @@ impl BuiltInWindowFunctionExpr for NthValue {
         &self.name
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        Ok(Box::new(NthValueAccumulator::try_new(
-            self.kind,
-            self.data_type.clone(),
-        )?))
-    }
-}
-
-#[derive(Debug)]
-struct NthValueAccumulator {
-    kind: NthValueKind,
-    offset: u32,
-    value: ScalarValue,
-}
-
-impl NthValueAccumulator {
-    /// new count accumulator
-    pub fn try_new(kind: NthValueKind, data_type: DataType) -> Result<Self> {
-        Ok(Self {
-            kind,
-            offset: 0,
-            // null value of that data_type by default
-            value: ScalarValue::try_from(&data_type)?,
-        })
-    }
-}
-
-impl WindowAccumulator for NthValueAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.offset += 1;
-        match self.kind {
-            NthValueKind::Last => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::First if self.offset == 1 => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::Nth(n) if self.offset == n => {
-                self.value = values[0].clone();
-            }
-            _ => {}
+    fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result<ArrayRef> {
+        if values.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No arguments supplied to {}",
+                self.name()
+            )));
         }
-
-        Ok(None)
-    }
-
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.value.clone()))
+        let value = &values[0];
+        if value.len() != num_rows {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid data supplied to {}, expect {} rows, got {} rows",
+                self.name(),
+                num_rows,
+                value.len()
+            )));
+        }
+        if num_rows == 0 {
+            return Ok(new_empty_array(value.data_type()));
+        }
+        let index: usize = match self.kind {

Review comment:
       πŸ‘ 

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -156,31 +162,72 @@ impl WindowExpr for BuiltInWindowExpr {
         self.window.expressions()
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        self.window.create_accumulator()
+    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_by
+    }
+
+    fn order_by(&self) -> &[PhysicalSortExpr] {
+        &self.order_by
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        // FIXME, for now we assume all the rows belong to the same partition, which will not be the
+        // case when partition_by is supported, in which case we'll parallelize the calls.
+        // See https://github.com/apache/arrow-datafusion/issues/299
+        let values = self.evaluate_args(batch)?;
+        self.window.evaluate(batch.num_rows(), &values)
     }
 }
 
 /// A window expr that takes the form of an aggregate function
 #[derive(Debug)]
 pub struct AggregateWindowExpr {
     aggregate: Arc<dyn AggregateExpr>,
+    partition_by: Vec<Arc<dyn PhysicalExpr>>,
+    order_by: Vec<PhysicalSortExpr>,
+    window_frame: Option<WindowFrame>,
 }
 
-#[derive(Debug)]
-struct AggregateWindowAccumulator {
-    accumulator: Box<dyn Accumulator>,
-}
+impl AggregateWindowExpr {
+    /// the aggregate window function operates based on window frame, and by default the mode is
+    /// "range".
+    fn evaluation_mode(&self) -> WindowFrameUnits {
+        self.window_frame.unwrap_or_default().units
+    }
 
-impl WindowAccumulator for AggregateWindowAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.accumulator.update(values)?;
-        Ok(None)
+    /// create a new accumulator based on the underlying aggregation function
+    fn create_accumulator(&self) -> Result<AggregateWindowAccumulator> {
+        let accumulator = self.aggregate.create_accumulator()?;
+        Ok(AggregateWindowAccumulator { accumulator })
     }
 
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.accumulator.evaluate()?))
+    /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
+    /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
+    /// results for peers) and concatenate the results.
+    fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {

Review comment:
       I don't understand the naming of `peer` here (rather than `range_based_evaluate` for example, to match with `WindowFrameUnits::Range`)

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -172,68 +148,47 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use arrow::{array::*, datatypes::*};
 
-    fn test_i32_result(expr: Arc<NthValue>, expected: i32) -> Result<()> {
+    fn test_i32_result(expr: NthValue, expected: Vec<i32>) -> Result<()> {
         let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5, -6, 7, 8]));
+        let values = vec![arr];

Review comment:
       This test change shows the nice refactoring

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1299,6 +1309,49 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn window_order_by() -> Result<()> {
+        let results = execute(
+            "SELECT \
+            c1, \
+            c2, \
+            ROW_NUMBER() OVER (ORDER BY c1, c2), \
+            FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
+            SUM(c2) OVER (ORDER BY c1, c2), \
+            COUNT(c2) OVER (ORDER BY c1, c2), \
+            MAX(c2) OVER (ORDER BY c1, c2), \
+            MIN(c2) OVER (ORDER BY c1, c2), \
+            AVG(c2) OVER (ORDER BY c1, c2) \
+            FROM test \
+            ORDER BY c1, c2 \
+            LIMIT 5",
+            4,
+        )
+        .await?;
+        // result in one batch, although e.g. having 2 batches do not change
+        // result semantics, having a len=1 assertion upfront keeps surprises
+        // at bay
+        assert_eq!(results.len(), 1);
+
+        let expected = vec![
+            "+----+----+--------------+-----------------+----------------+------------------------+---------+-----------+---------+---------+---------+",
+            "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(c2) | LAST_VALUE(c2) | NTH_VALUE(c2,Int64(2)) | SUM(c2) | COUNT(c2) | MAX(c2) | MIN(c2) | AVG(c2) |",
+            "+----+----+--------------+-----------------+----------------+------------------------+---------+-----------+---------+---------+---------+",
+            "| 0  | 1  | 1            | 1               | 10             | 2                      | 1       | 1         | 1       | 1       | 1       |",

Review comment:
       I double checked these in postgres. πŸ‘ 

##########
File path: ballista/rust/core/Cargo.toml
##########
@@ -40,7 +40,7 @@ tokio = "1.0"
 tonic = "0.4"
 uuid = { version = "0.8", features = ["v4"] }
 
-arrow-flight = { version = "4.0"  }
+arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }

Review comment:
       4.3.0 has been released so we can probably revert this change

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -465,15 +467,65 @@ pub trait WindowExpr: Send + Sync + Debug {
         "WindowExpr: default name"
     }
 
-    /// the accumulator used to accumulate values from the expressions.
-    /// the accumulator expects the same number of arguments as `expressions` and must
-    /// return states with the same description as `state_fields`
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>>;
-
     /// expressions that are passed to the WindowAccumulator.

Review comment:
       The `WindowExpr` trait is looking πŸ‘Œ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651356971



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -113,54 +111,32 @@ impl BuiltInWindowFunctionExpr for NthValue {
         &self.name
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        Ok(Box::new(NthValueAccumulator::try_new(
-            self.kind,
-            self.data_type.clone(),
-        )?))
-    }
-}
-
-#[derive(Debug)]
-struct NthValueAccumulator {
-    kind: NthValueKind,
-    offset: u32,
-    value: ScalarValue,
-}
-
-impl NthValueAccumulator {
-    /// new count accumulator
-    pub fn try_new(kind: NthValueKind, data_type: DataType) -> Result<Self> {
-        Ok(Self {
-            kind,
-            offset: 0,
-            // null value of that data_type by default
-            value: ScalarValue::try_from(&data_type)?,
-        })
-    }
-}
-
-impl WindowAccumulator for NthValueAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.offset += 1;
-        match self.kind {
-            NthValueKind::Last => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::First if self.offset == 1 => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::Nth(n) if self.offset == n => {
-                self.value = values[0].clone();
-            }
-            _ => {}
+    fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result<ArrayRef> {
+        if values.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No arguments supplied to {}",
+                self.name()
+            )));
         }
-
-        Ok(None)
-    }
-
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.value.clone()))
+        let value = &values[0];
+        if value.len() != num_rows {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid data supplied to {}, expect {} rows, got {} rows",
+                self.name(),
+                num_rows,
+                value.len()
+            )));
+        }
+        if num_rows == 0 {

Review comment:
       this will be changed in later pull request




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #520: WIP Impl window order by

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#issuecomment-858784623


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/520?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#520](https://codecov.io/gh/apache/arrow-datafusion/pull/520?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5a3fcb0) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/5c88450a0286c98cdd4b0679f6b09b7eee1c3570?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5c88450) will **decrease** coverage by `0.00%`.
   > The diff coverage is `48.14%`.
   
   > :exclamation: Current head 5a3fcb0 differs from pull request most recent head 800563f. Consider uploading reports for the commit 800563f to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/520/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/520?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #520      +/-   ##
   ==========================================
   - Coverage   76.09%   76.09%   -0.01%     
   ==========================================
     Files         156      156              
     Lines       27031    27068      +37     
   ==========================================
   + Hits        20570    20598      +28     
   - Misses       6461     6470       +9     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/520?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [...ta/rust/core/src/serde/physical\_plan/from\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/520/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9zZXJkZS9waHlzaWNhbF9wbGFuL2Zyb21fcHJvdG8ucnM=) | `41.13% <0.00%> (+3.41%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/520/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `72.64% <0.00%> (-6.06%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/520/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9wbGFubmVyLnJz) | `78.51% <57.14%> (-1.69%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/windows.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/520/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi93aW5kb3dzLnJz) | `83.81% <65.21%> (-2.41%)` | :arrow_down: |
   | [datafusion/src/logical\_plan/window\_frames.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/520/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvbG9naWNhbF9wbGFuL3dpbmRvd19mcmFtZXMucnM=) | `89.39% <0.00%> (+1.51%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/520?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/520?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5c88450...800563f](https://codecov.io/gh/apache/arrow-datafusion/pull/520?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter edited a comment on pull request #520: WIP Impl window order by

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#issuecomment-858784623






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r650504538



##########
File path: ballista/rust/core/Cargo.toml
##########
@@ -40,7 +40,7 @@ tokio = "1.0"
 tonic = "0.4"
 uuid = { version = "0.8", features = ["v4"] }
 
-arrow-flight = { version = "4.0"  }
+arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }

Review comment:
       because this change relies on https://github.com/apache/arrow-datafusion/pull/520/files#diff-d76aa4d1931f29ba8725c7d31887fb45c5d8d74a026dae914e2012be1a15be33R28

##########
File path: ballista/rust/core/Cargo.toml
##########
@@ -40,7 +40,7 @@ tokio = "1.0"
 tonic = "0.4"
 uuid = { version = "0.8", features = ["v4"] }
 
-arrow-flight = { version = "4.0"  }
+arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }

Review comment:
       > For this we need to wait on 4.3.0. this includes the required changes, no?
   > 
   > 
   > 
   > https://github.com/apache/arrow-rs/pull/444
   > 
   > 
   
   Yes it does and after the crate version is up i will remove the changes to cargo.toml files 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r650546802



##########
File path: ballista/rust/core/Cargo.toml
##########
@@ -40,7 +40,7 @@ tokio = "1.0"
 tonic = "0.4"
 uuid = { version = "0.8", features = ["v4"] }
 
-arrow-flight = { version = "4.0"  }
+arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }

Review comment:
       For this we need to wait on 4.3.0. this includes the required changes, no?
   
   https://github.com/apache/arrow-rs/pull/444
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist commented on pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#issuecomment-861614647


   @alamb this pull request is ready now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #520: Implement window functions with `order_by` clause

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651361200



##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -156,31 +162,72 @@ impl WindowExpr for BuiltInWindowExpr {
         self.window.expressions()
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        self.window.create_accumulator()
+    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_by
+    }
+
+    fn order_by(&self) -> &[PhysicalSortExpr] {
+        &self.order_by
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        // FIXME, for now we assume all the rows belong to the same partition, which will not be the
+        // case when partition_by is supported, in which case we'll parallelize the calls.
+        // See https://github.com/apache/arrow-datafusion/issues/299
+        let values = self.evaluate_args(batch)?;
+        self.window.evaluate(batch.num_rows(), &values)
     }
 }
 
 /// A window expr that takes the form of an aggregate function
 #[derive(Debug)]
 pub struct AggregateWindowExpr {
     aggregate: Arc<dyn AggregateExpr>,
+    partition_by: Vec<Arc<dyn PhysicalExpr>>,
+    order_by: Vec<PhysicalSortExpr>,
+    window_frame: Option<WindowFrame>,
 }
 
-#[derive(Debug)]
-struct AggregateWindowAccumulator {
-    accumulator: Box<dyn Accumulator>,
-}
+impl AggregateWindowExpr {
+    /// the aggregate window function operates based on window frame, and by default the mode is
+    /// "range".
+    fn evaluation_mode(&self) -> WindowFrameUnits {
+        self.window_frame.unwrap_or_default().units
+    }
 
-impl WindowAccumulator for AggregateWindowAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.accumulator.update(values)?;
-        Ok(None)
+    /// create a new accumulator based on the underlying aggregation function
+    fn create_accumulator(&self) -> Result<AggregateWindowAccumulator> {
+        let accumulator = self.aggregate.create_accumulator()?;
+        Ok(AggregateWindowAccumulator { accumulator })
     }
 
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.accumulator.evaluate()?))
+    /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
+    /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
+    /// results for peers) and concatenate the results.
+    fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {

Review comment:
       since this is private function i guess i can leave the naming part for later changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org