You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/02 08:10:30 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #4078: Custom window frame support extended to built-in window functions

mustafasrepo opened a new pull request, #4078:
URL: https://github.com/apache/arrow-datafusion/pull/4078

   
   # Which issue does this PR close?
   
   Closes #4076.
   
   # Rationale for this change
   
   With this change, we can now run built-in window functions with custom window frames such as queries in the form 
   
   ```sql
   SELECT
        FIRST_VALUE(c4) OVER(ORDER BY c9) as first_value1,
        FIRST_VALUE(c4) OVER(ORDER BY c9) as first_value2,
        LAST_VALUE(c4) OVER(ORDER BY c9) as last_value1,
        LAST_VALUE(c4) OVER(ORDER BY c9) as last_value2
        FROM aggregate_test_100
        ORDER BY c9
        LIMIT 5
   ```
   
   # What changes are included in this PR?
   
   - We added range calculation support for Builtin-window functions FIRST_VALUE, LAST_VALUE, NTH_VALUE (For other window functions what is inside window frame isn’t important [https://www.postgresql.org/docs/current/functions-window.html](https://www.postgresql.org/docs/current/functions-window.html)). To be able to use range calculation code for both window functions and aggregate functions, range calculation code is moved from [aggregate.rs](https://github.com/synnada-ai/arrow-datafusion/blob/feature/builtin_window_running/datafusion/physical-expr/src/window/aggregate.rs) to [window_expr.rs](https://github.com/synnada-ai/arrow-datafusion/blob/feature/builtin_window_running/datafusion/physical-expr/src/window/window_expr.rs).
   - Added tests for Built-in Window Functions
   
   
   # Are there any user-facing changes?
   
   N.A


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #4078: Custom window frame support extended to built-in window functions

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #4078:
URL: https://github.com/apache/arrow-datafusion/pull/4078#issuecomment-1304501224

   Benchmark runs are scheduled for baseline = 6d00bd990ce5644181ad1549a6c70c8406219070 and contender = 238e179224661f681b20b9ae32f59efd5a3b0713. 238e179224661f681b20b9ae32f59efd5a3b0713 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/f52a46a792a94a97853290d6f7721faa...74c063638fb14320947eb9ebc781dbf0/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/7d1b5fadf46a4bc6899482ab4ff89eb8...6d60ef8ea91e4d7eaaf940dfad75931d/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/0fc21d50840941c0b18e380922707345...1500f1fdca064e84b49e8926debf610b/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/0b2aba51e06545d5ba6b7d647606335c...78c52e2c3322468490cc21e245460cb5/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4078: Custom window frame support extended to built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4078:
URL: https://github.com/apache/arrow-datafusion/pull/4078#issuecomment-1302673487

   I again ran out of time today but it is on my list for first thing tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4078: Custom window frame support extended to built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4078:
URL: https://github.com/apache/arrow-datafusion/pull/4078#discussion_r1014125149


##########
datafusion/physical-expr/src/window/aggregate.rs:
##########
@@ -103,368 +82,86 @@ impl WindowExpr for AggregateWindowExpr {
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        let num_rows = batch.num_rows();
+        let partition_columns = self.partition_columns(batch)?;
         let partition_points =
-            self.evaluate_partition_points(num_rows, &self.partition_columns(batch)?)?;
+            self.evaluate_partition_points(batch.num_rows(), &partition_columns)?;
         let values = self.evaluate_args(batch)?;
 
+        let sort_options: Vec<SortOptions> =
+            self.order_by.iter().map(|o| o.options).collect();
         let columns = self.sort_columns(batch)?;
-        let array_refs: Vec<&ArrayRef> = columns.iter().map(|s| &s.values).collect();
+        let order_columns: Vec<&ArrayRef> = columns.iter().map(|s| &s.values).collect();
         // Sort values, this will make the same partitions consecutive. Also, within the partition
         // range, values will be sorted.
-        let results = partition_points
-            .iter()
-            .map(|partition_range| {
-                let mut window_accumulators = self.create_accumulator()?;
-                Ok(vec![window_accumulators.scan(
-                    &values,
-                    &array_refs,
-                    partition_range,
-                )?])
-            })
-            .collect::<Result<Vec<Vec<ArrayRef>>>>()?
-            .into_iter()
-            .flatten()
-            .collect::<Vec<ArrayRef>>();
-        let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
-        concat(&results).map_err(DataFusionError::ArrowError)
-    }
-
-    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
-        &self.partition_by
-    }
-
-    fn order_by(&self) -> &[PhysicalSortExpr] {
-        &self.order_by
-    }
-}
-
-fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
-    range_columns: &[ArrayRef],
-    sort_options: &[SortOptions],
-    idx: usize,
-    delta: Option<&ScalarValue>,
-) -> Result<usize> {
-    let current_row_values = range_columns
-        .iter()
-        .map(|col| ScalarValue::try_from_array(col, idx))
-        .collect::<Result<Vec<ScalarValue>>>()?;
-    let end_range = if let Some(delta) = delta {
-        let is_descending: bool = sort_options
-            .first()
-            .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
-            .descending;
-
-        current_row_values
-            .iter()
-            .map(|value| {
-                if value.is_null() {
-                    return Ok(value.clone());
-                }
-                if SEARCH_SIDE == is_descending {
-                    // TODO: Handle positive overflows
-                    value.add(delta)
-                } else if value.is_unsigned() && value < delta {
-                    // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
-                    //       If we decide to implement a "default" construction mechanism for ScalarValue,
-                    //       change the following statement to use that.
-                    value.sub(value)
+        let order_bys = &order_columns[self.partition_by.len()..];
+        let window_frame = if !order_bys.is_empty() && self.window_frame.is_none() {
+            // OVER (ORDER BY a) case
+            // We create an implicit window for ORDER BY.
+            Some(Arc::new(WindowFrame::default()))
+        } else {
+            self.window_frame.clone()
+        };
+        let mut row_wise_results: Vec<ScalarValue> = vec![];
+        for partition_range in &partition_points {

Review Comment:
   This reorganization is very nice and makes the code much easier to read . Very nice πŸ‘ 



##########
datafusion/physical-expr/src/window/window_expr.rs:
##########
@@ -110,4 +115,208 @@ pub trait WindowExpr: Send + Sync + Debug {
         sort_columns.extend(order_by_columns);
         Ok(sort_columns)
     }
+
+    /// We use start and end bounds to calculate current row's starting and ending range.
+    /// This function supports different modes, but we currently do not support window calculation for GROUPS inside window frames.
+    fn calculate_range(
+        &self,
+        window_frame: &Option<Arc<WindowFrame>>,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        if let Some(window_frame) = window_frame {
+            match window_frame.units {
+                WindowFrameUnits::Range => {
+                    let start = match &window_frame.start_bound {
+                        // UNBOUNDED PRECEDING
+                        WindowFrameBound::Preceding(n) => {
+                            if n.is_null() {
+                                0
+                            } else {
+                                calculate_index_of_row::<true, true>(
+                                    range_columns,
+                                    sort_options,
+                                    idx,
+                                    Some(n),
+                                )?
+                            }
+                        }
+                        WindowFrameBound::CurrentRow => {
+                            if range_columns.is_empty() {
+                                0
+                            } else {
+                                calculate_index_of_row::<true, true>(
+                                    range_columns,
+                                    sort_options,
+                                    idx,
+                                    None,
+                                )?
+                            }
+                        }
+                        WindowFrameBound::Following(n) => {
+                            calculate_index_of_row::<true, false>(
+                                range_columns,
+                                sort_options,
+                                idx,
+                                Some(n),
+                            )?
+                        }
+                    };
+                    let end = match &window_frame.end_bound {
+                        WindowFrameBound::Preceding(n) => {
+                            calculate_index_of_row::<false, true>(
+                                range_columns,
+                                sort_options,
+                                idx,
+                                Some(n),
+                            )?
+                        }
+                        WindowFrameBound::CurrentRow => {
+                            if range_columns.is_empty() {
+                                length
+                            } else {
+                                calculate_index_of_row::<false, false>(
+                                    range_columns,
+                                    sort_options,
+                                    idx,
+                                    None,
+                                )?
+                            }
+                        }
+                        WindowFrameBound::Following(n) => {
+                            if n.is_null() {
+                                // UNBOUNDED FOLLOWING
+                                length
+                            } else {
+                                calculate_index_of_row::<false, false>(
+                                    range_columns,
+                                    sort_options,
+                                    idx,
+                                    Some(n),
+                                )?
+                            }
+                        }
+                    };
+                    Ok((start, end))
+                }
+                WindowFrameUnits::Rows => {
+                    let start = match window_frame.start_bound {
+                        // UNBOUNDED PRECEDING
+                        WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+                        WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                            if idx >= n as usize {
+                                idx - n as usize
+                            } else {
+                                0
+                            }
+                        }
+                        WindowFrameBound::Preceding(_) => {
+                            return Err(DataFusionError::Internal(
+                                "Rows should be Uint".to_string(),
+                            ))
+                        }
+                        WindowFrameBound::CurrentRow => idx,
+                        // UNBOUNDED FOLLOWING
+                        WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                            return Err(DataFusionError::Internal(format!(
+                                "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                                window_frame
+                            )))
+                        }
+                        WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                            min(idx + n as usize, length)
+                        }
+                        WindowFrameBound::Following(_) => {
+                            return Err(DataFusionError::Internal(
+                                "Rows should be Uint".to_string(),
+                            ))
+                        }
+                    };
+                    let end = match window_frame.end_bound {
+                        // UNBOUNDED PRECEDING
+                        WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                            return Err(DataFusionError::Internal(format!(
+                                "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                                window_frame
+                            )))
+                        }
+                        WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                            if idx >= n as usize {
+                                idx - n as usize + 1
+                            } else {
+                                0
+                            }
+                        }
+                        WindowFrameBound::Preceding(_) => {
+                            return Err(DataFusionError::Internal(
+                                "Rows should be Uint".to_string(),
+                            ))
+                        }
+                        WindowFrameBound::CurrentRow => idx + 1,
+                        // UNBOUNDED FOLLOWING
+                        WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+                        WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                            min(idx + n as usize + 1, length)
+                        }
+                        WindowFrameBound::Following(_) => {
+                            return Err(DataFusionError::Internal(
+                                "Rows should be Uint".to_string(),
+                            ))
+                        }
+                    };
+                    Ok((start, end))
+                }
+                WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented(
+                    "Window frame for groups is not implemented".to_string(),

Review Comment:
   πŸ‘ 



##########
datafusion/physical-expr/src/window/window_expr.rs:
##########
@@ -110,4 +115,208 @@ pub trait WindowExpr: Send + Sync + Debug {
         sort_columns.extend(order_by_columns);
         Ok(sort_columns)
     }
+
+    /// We use start and end bounds to calculate current row's starting and ending range.
+    /// This function supports different modes, but we currently do not support window calculation for GROUPS inside window frames.
+    fn calculate_range(

Review Comment:
   I find this logic to be very straightforward and easy to follow πŸ‘ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4078: Custom window frame support extended to built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4078:
URL: https://github.com/apache/arrow-datafusion/pull/4078#issuecomment-1304501063

   Again, really nice work @mustafasrepo  and @ozankabak -- thank you very much


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #4078: Custom window frame support extended to built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb merged PR #4078:
URL: https://github.com/apache/arrow-datafusion/pull/4078


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4078: Custom window frame support extended to built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4078:
URL: https://github.com/apache/arrow-datafusion/pull/4078#issuecomment-1303701172

   I'll plan to merge this PR tomorrow unless there are other comments raised. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org