You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 14:00:32 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request, #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

yjshen opened a new pull request, #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171

   # Which issue does this PR close?
   
   Closes #.
   
    # Rationale for this change
   - [Tests](https://github.com/blaze-init/spark-blaze-extension/runs/5847348229?check_suite_focus=true) with Coalesce in Blaze seem to run forever because we are iterating and calculating per cell. 
   - `If_then_else` in `case.rs` could be substitute by `arrow::compute::kernels::zip`
   
   # What changes are included in this PR?
   - Rework a little bit on Coalesce evaluation.
   - Use `zip` in `CaseWhen` evaluation.
   
   # Are there any user-facing changes?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171#discussion_r845450645


##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -19,7 +19,8 @@ use std::{any::Any, sync::Arc};
 
 use crate::expressions::try_cast;
 use crate::PhysicalExpr;
-use arrow::array::{self, *};
+use arrow::array::*;
+use arrow::compute::kernels::zip::zip;

Review Comment:
   it certainly isn't the same as `zip` in Rust's iterators -- so I agree it is confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] liukun4515 commented on a diff in pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171#discussion_r844573204


##########
datafusion/physical-expr/src/conditional_expressions.rs:
##########
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         )));
     }
 
-    let size = match args[0] {
-        ColumnarValue::Array(ref a) => a.len(),
-        ColumnarValue::Scalar(ref _s) => 1,
-    };
-    let mut res = new_null_array(&args[0].data_type(), size);
+    let return_type = args[0].data_type();
+    let mut return_array = args.iter().filter_map(|x| match x {
+        ColumnarValue::Array(array) => Some(array.len()),
+        _ => None,

Review Comment:
   why not use the same logic by returning `Some(1)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171#discussion_r845175860


##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -19,7 +19,8 @@ use std::{any::Any, sync::Arc};
 
 use crate::expressions::try_cast;
 use crate::PhysicalExpr;
-use arrow::array::{self, *};
+use arrow::array::*;
+use arrow::compute::kernels::zip::zip;

Review Comment:
   I also didn't know this one.
   
   Zip seems a confusing name for this functionality?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171#discussion_r844248002


##########
datafusion/physical-expr/src/conditional_expressions.rs:
##########
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         )));
     }
 
-    let size = match args[0] {
-        ColumnarValue::Array(ref a) => a.len(),
-        ColumnarValue::Scalar(ref _s) => 1,
-    };
-    let mut res = new_null_array(&args[0].data_type(), size);
+    let return_type = args[0].data_type();
+    let mut return_array = args.iter().filter_map(|x| match x {
+        ColumnarValue::Array(array) => Some(array.len()),
+        _ => None,
+    });
+
+    if let Some(size) = return_array.next() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, size);
+        let mut remainder = BooleanArray::from(vec![true; size]);
 
-    for column_value in args {
-        for i in 0..size {
-            match column_value {
-                ColumnarValue::Array(array_ref) => {
-                    let curr_null_mask = compute::is_null(res.as_ref())?;
-                    let arr_not_null_mask = compute::is_not_null(array_ref)?;
-                    let bool_mask = compute::and(&curr_null_mask, &arr_not_null_mask)?;
-                    res = zip(&bool_mask, array_ref, &res)?;
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(ref array) => {
+                    let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
+                    current_value = zip(&to_apply, array, current_value.as_ref())?;

Review Comment:
   👍 



##########
datafusion/physical-expr/src/conditional_expressions.rs:
##########
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         )));
     }
 
-    let size = match args[0] {
-        ColumnarValue::Array(ref a) => a.len(),
-        ColumnarValue::Scalar(ref _s) => 1,
-    };
-    let mut res = new_null_array(&args[0].data_type(), size);
+    let return_type = args[0].data_type();
+    let mut return_array = args.iter().filter_map(|x| match x {
+        ColumnarValue::Array(array) => Some(array.len()),
+        _ => None,
+    });
+
+    if let Some(size) = return_array.next() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, size);
+        let mut remainder = BooleanArray::from(vec![true; size]);
 
-    for column_value in args {
-        for i in 0..size {
-            match column_value {
-                ColumnarValue::Array(array_ref) => {
-                    let curr_null_mask = compute::is_null(res.as_ref())?;
-                    let arr_not_null_mask = compute::is_not_null(array_ref)?;
-                    let bool_mask = compute::and(&curr_null_mask, &arr_not_null_mask)?;
-                    res = zip(&bool_mask, array_ref, &res)?;
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(ref array) => {
+                    let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
+                    current_value = zip(&to_apply, array, current_value.as_ref())?;
+                    remainder = and(&remainder, &is_null(array)?)?;
                 }
-                ColumnarValue::Scalar(scalar) => {
-                    if !scalar.is_null() && res.is_null(i) {
-                        let vec: Vec<bool> =
-                            (0..size).into_iter().map(|j| j == i).collect();
-                        let bool_arr = BooleanArray::from(vec);
-                        res =
-                            zip(&bool_arr, scalar.to_array_of_size(size).as_ref(), &res)?;
+                ColumnarValue::Scalar(value) => {
+                    if value.is_null() {
                         continue;
+                    } else {
+                        let last_value = value.to_array_of_size(size);
+                        current_value =
+                            zip(&remainder, &last_value, current_value.as_ref())?;
+                        break;

Review Comment:
   👍  early return on constant



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -19,7 +19,8 @@ use std::{any::Any, sync::Arc};
 
 use crate::expressions::try_cast;
 use crate::PhysicalExpr;
-use arrow::array::{self, *};
+use arrow::array::*;
+use arrow::compute::kernels::zip::zip;

Review Comment:
   Today I Learned: https://docs.rs/arrow/11.1.0/arrow/compute/kernels/zip/fn.zip.html ✅ 



##########
datafusion/physical-expr/src/conditional_expressions.rs:
##########
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         )));
     }
 
-    let size = match args[0] {
-        ColumnarValue::Array(ref a) => a.len(),
-        ColumnarValue::Scalar(ref _s) => 1,
-    };
-    let mut res = new_null_array(&args[0].data_type(), size);
+    let return_type = args[0].data_type();
+    let mut return_array = args.iter().filter_map(|x| match x {
+        ColumnarValue::Array(array) => Some(array.len()),
+        _ => None,
+    });
+
+    if let Some(size) = return_array.next() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, size);
+        let mut remainder = BooleanArray::from(vec![true; size]);
 
-    for column_value in args {
-        for i in 0..size {
-            match column_value {
-                ColumnarValue::Array(array_ref) => {
-                    let curr_null_mask = compute::is_null(res.as_ref())?;
-                    let arr_not_null_mask = compute::is_not_null(array_ref)?;
-                    let bool_mask = compute::and(&curr_null_mask, &arr_not_null_mask)?;
-                    res = zip(&bool_mask, array_ref, &res)?;
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(ref array) => {
+                    let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
+                    current_value = zip(&to_apply, array, current_value.as_ref())?;
+                    remainder = and(&remainder, &is_null(array)?)?;
                 }
-                ColumnarValue::Scalar(scalar) => {
-                    if !scalar.is_null() && res.is_null(i) {
-                        let vec: Vec<bool> =
-                            (0..size).into_iter().map(|j| j == i).collect();
-                        let bool_arr = BooleanArray::from(vec);
-                        res =
-                            zip(&bool_arr, scalar.to_array_of_size(size).as_ref(), &res)?;
+                ColumnarValue::Scalar(value) => {
+                    if value.is_null() {
                         continue;
+                    } else {
+                        let last_value = value.to_array_of_size(size);
+                        current_value =
+                            zip(&remainder, &last_value, current_value.as_ref())?;
+                        break;
                     }
                 }
             }
+            if remainder.iter().all(|x| x == Some(false)) {

Review Comment:
   There are fancier / more optimized ways in the guts of the arrow-rs implementation for testing if a bitmask is entirely false, but maybe we can expose them if more optimization is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
yjshen commented on PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171#issuecomment-1091536479

   Thanks @alamb @liukun4515


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen merged pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
yjshen merged PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171#discussion_r845180292


##########
datafusion/physical-expr/src/conditional_expressions.rs:
##########
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         )));
     }
 
-    let size = match args[0] {
-        ColumnarValue::Array(ref a) => a.len(),
-        ColumnarValue::Scalar(ref _s) => 1,
-    };
-    let mut res = new_null_array(&args[0].data_type(), size);
+    let return_type = args[0].data_type();
+    let mut return_array = args.iter().filter_map(|x| match x {
+        ColumnarValue::Array(array) => Some(array.len()),
+        _ => None,
+    });
+
+    if let Some(size) = return_array.next() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, size);
+        let mut remainder = BooleanArray::from(vec![true; size]);
 
-    for column_value in args {
-        for i in 0..size {
-            match column_value {
-                ColumnarValue::Array(array_ref) => {
-                    let curr_null_mask = compute::is_null(res.as_ref())?;
-                    let arr_not_null_mask = compute::is_not_null(array_ref)?;
-                    let bool_mask = compute::and(&curr_null_mask, &arr_not_null_mask)?;
-                    res = zip(&bool_mask, array_ref, &res)?;
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(ref array) => {
+                    let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
+                    current_value = zip(&to_apply, array, current_value.as_ref())?;
+                    remainder = and(&remainder, &is_null(array)?)?;
                 }
-                ColumnarValue::Scalar(scalar) => {
-                    if !scalar.is_null() && res.is_null(i) {
-                        let vec: Vec<bool> =
-                            (0..size).into_iter().map(|j| j == i).collect();
-                        let bool_arr = BooleanArray::from(vec);
-                        res =
-                            zip(&bool_arr, scalar.to_array_of_size(size).as_ref(), &res)?;
+                ColumnarValue::Scalar(value) => {
+                    if value.is_null() {
                         continue;
+                    } else {
+                        let last_value = value.to_array_of_size(size);
+                        current_value =
+                            zip(&remainder, &last_value, current_value.as_ref())?;
+                        break;
                     }
                 }
             }
+            if remainder.iter().all(|x| x == Some(false)) {

Review Comment:
   There also seem to be some opportunities to optimize `zip` in arrow-rs for all-true or all-false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2171: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2171:
URL: https://github.com/apache/arrow-datafusion/pull/2171#discussion_r844576185


##########
datafusion/physical-expr/src/conditional_expressions.rs:
##########
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         )));
     }
 
-    let size = match args[0] {
-        ColumnarValue::Array(ref a) => a.len(),
-        ColumnarValue::Scalar(ref _s) => 1,
-    };
-    let mut res = new_null_array(&args[0].data_type(), size);
+    let return_type = args[0].data_type();
+    let mut return_array = args.iter().filter_map(|x| match x {
+        ColumnarValue::Array(array) => Some(array.len()),
+        _ => None,

Review Comment:
   return_array is used to decide whether to return a scalar or a vector, so if we see no array sizes and get None from `return_array.next()` we know all params are scalar and will enter branch in line-70 and do fast scalar value coalesce.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org