You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/23 09:50:45 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

yjshen opened a new pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068


   # Which issue does this PR close?
   
   
   Closes #2064 .
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] doki23 commented on a change in pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

Posted by GitBox <gi...@apache.org>.
doki23 commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833872460



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       We can evaluate `case_when` from beginning to the end and use evaluate_selection for the when_expr so that we can omit the following computation of when expr of the record which when_expr is already true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833077973



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       Currently, we are evaluating `case_when` from the end to the beginning. This is problematic since **short-circuit evaluation** is adopted by most engines, including PostgreSQL, Oracle, and SQL Server. Therefore, chances are users would express computation logic that would fail in later `when_thens` for tuples that **should have been** computed and bypassed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r835490088



##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {

Review comment:
       Do you think it is worth optimizing away the `take` (`copy`) when `selection` is all true?

##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {
+            if let Some(true) = b {
+                indices.push(i as u64);
+            }
+        }
+        let indices = UInt64Array::from_iter_values(indices);
+        let tmp_columns = batch
+            .columns()
+            .iter()
+            .map(|c| {
+                take(c.as_ref(), &indices, None)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))

Review comment:
       I think there is an `ArrowError` here you could use too: https://github.com/apache/arrow-datafusion/blob/master/datafusion-common/src/error.rs#L44
   
   You may not even have to use `map_err` at all (maybe ?) is sufficient
   
   

##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {
+            if let Some(true) = b {
+                indices.push(i as u64);
+            }
+        }
+        let indices = UInt64Array::from_iter_values(indices);

Review comment:
       I assume you can't just update the `null` mask of the source batch to be `null` where validity is `false` because things like the divide kernel will still throw runtime exceptions if the data is 0?

##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {
+            if let Some(true) = b {
+                indices.push(i as u64);
+            }
+        }
+        let indices = UInt64Array::from_iter_values(indices);
+        let tmp_columns = batch
+            .columns()
+            .iter()
+            .map(|c| {
+                take(c.as_ref(), &indices, None)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))
+            })
+            .collect::<Result<Vec<Arc<dyn Array>>>>()?;
+
+        let tmp_batch = RecordBatch::try_new(batch.schema(), tmp_columns)?;
+        let tmp_result = self.evaluate(&tmp_batch)?;
+        if let ColumnarValue::Array(a) = tmp_result {
+            let result = scatter(selection, a.as_ref())?;
+            Ok(ColumnarValue::Array(result))
+        } else {
+            Ok(tmp_result)
+        }
+    }
+}
+
+/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
+/// are taken, when the mask evaluates `false` values null values are filled.
+///
+/// # Arguments
+/// * `mask` - Boolean values used to determine where to put the `truthy` values
+/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
+fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
+    let truthy = truthy.data();
+
+    let mut mutable = MutableArrayData::new(vec![&*truthy], true, mask.len());

Review comment:
       I am probably missing something here but this code looks like it always creates `BooleanArrays` even when `truthy` is some other type -- in the case examples you have, the resulting expression is always boolean, but I wonder if this is always the case
   
   Perhaps it is worth an `assert!` that `truthy.data_type() == DataType::Boolean`?
   
   Even better would be some unit tests showing how scatter worked (for boolean and non boolean arrays)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r835884359



##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {
+            if let Some(true) = b {
+                indices.push(i as u64);
+            }
+        }
+        let indices = UInt64Array::from_iter_values(indices);

Review comment:
       I was just thinking it might be possible to do something like the following psuedo code:
   
   ```rust
   let mask = and(old_array.null_mask(), selection);
   let new_array = old_array.replace_null_mask(mask);
   let result = compute_expr(new_array);
   ```
   
   And skip having to scatter / gather
   
   However, given this code works and is covered  by tests maybe we cn revisit the approach if there is some performance or correctness issue in the future




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r835490032



##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array

Review comment:
       ```suggestion
       /// Evaluate an expression against a RecordBatch after first applying a
       /// validity array
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833068149



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       This test fails since we are currently evaluating `else` first for 
   ```SQL
   CASE expr WHEN value THEN result ELSE result END
   ```
   Regardless of a tuple has entered `when branches` before and should bypass `else`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r835729606



##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {
+            if let Some(true) = b {
+                indices.push(i as u64);
+            }
+        }
+        let indices = UInt64Array::from_iter_values(indices);
+        let tmp_columns = batch
+            .columns()
+            .iter()
+            .map(|c| {
+                take(c.as_ref(), &indices, None)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))
+            })
+            .collect::<Result<Vec<Arc<dyn Array>>>>()?;
+
+        let tmp_batch = RecordBatch::try_new(batch.schema(), tmp_columns)?;
+        let tmp_result = self.evaluate(&tmp_batch)?;
+        if let ColumnarValue::Array(a) = tmp_result {
+            let result = scatter(selection, a.as_ref())?;
+            Ok(ColumnarValue::Array(result))
+        } else {
+            Ok(tmp_result)
+        }
+    }
+}
+
+/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
+/// are taken, when the mask evaluates `false` values null values are filled.
+///
+/// # Arguments
+/// * `mask` - Boolean values used to determine where to put the `truthy` values
+/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
+fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
+    let truthy = truthy.data();
+
+    let mut mutable = MutableArrayData::new(vec![&*truthy], true, mask.len());

Review comment:
       It's extending with `mutable.extend(0, true_pos, true_pos + len);` array from index 0 (the only truthy array), so the result is of the same type with truthy. Test added as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833077973



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       Also, we are evaluating `case_when` from the end to the beginning. This is problematic since **short-circuit evaluation** is adopted by most engines, including PostgreSQL, Oracle, and SQL Server. Therefore, chances are users would express computation logic that would fail in later `when_thens` for tuples that **should have been** computed previously and bypassed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] doki23 commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
doki23 commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833893451



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -323,24 +323,19 @@ impl CaseExpr {
         let base_value = expr.evaluate(batch)?;
         let base_type = expr.data_type(&batch.schema())?;
         let base_value = base_value.into_array(batch.num_rows());
+        let base_nulls = is_null(base_value.as_ref())?;
 
-        // start with the else condition, or nulls
-        let mut current_value: Option<ArrayRef> = if let Some(e) = &self.else_expr {
-            // keep `else_expr`'s data type and return type consistent
-            let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone())
-                .unwrap_or_else(|_| e.clone());
-            Some(expr.evaluate(batch)?.into_array(batch.num_rows()))
-        } else {
-            Some(new_null_array(&return_type, batch.num_rows()))
-        };
-
-        // walk backwards through the when/then expressions
-        for i in (0..self.when_then_expr.len()).rev() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, batch.num_rows());
+        // We only consider non-null values while comparing with whens
+        let mut remainder = not(&base_nulls)?;
+        for i in 0..self.when_then_expr.len() {
             let i = i as usize;
 
-            let when_value = self.when_then_expr[i].0.evaluate(batch)?;
+            let when_value = self.when_then_expr[i]
+                .0
+                .evaluate_selection(batch, &remainder)?;

Review comment:
       👍 Great! It's short-circuit now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] doki23 commented on a change in pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

Posted by GitBox <gi...@apache.org>.
doki23 commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833872460



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       We can evaluate `case_when` from beginning to the end and use `evaluate_selection` for the `when_expr` so that we can omit the following computation of the record whose `when_expr` is already true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] doki23 commented on a change in pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

Posted by GitBox <gi...@apache.org>.
doki23 commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833872460



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       We can evaluate `case_when` from beginning to the end and use `evaluate_selection` for the `when_expr` so that we can omit the following computation of the record whose `when_expr` is already true. Do I get your idea?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r835729606



##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {
+            if let Some(true) = b {
+                indices.push(i as u64);
+            }
+        }
+        let indices = UInt64Array::from_iter_values(indices);
+        let tmp_columns = batch
+            .columns()
+            .iter()
+            .map(|c| {
+                take(c.as_ref(), &indices, None)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))
+            })
+            .collect::<Result<Vec<Arc<dyn Array>>>>()?;
+
+        let tmp_batch = RecordBatch::try_new(batch.schema(), tmp_columns)?;
+        let tmp_result = self.evaluate(&tmp_batch)?;
+        if let ColumnarValue::Array(a) = tmp_result {
+            let result = scatter(selection, a.as_ref())?;
+            Ok(ColumnarValue::Array(result))
+        } else {
+            Ok(tmp_result)
+        }
+    }
+}
+
+/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy`
+/// are taken, when the mask evaluates `false` values null values are filled.
+///
+/// # Arguments
+/// * `mask` - Boolean values used to determine where to put the `truthy` values
+/// * `truthy` - All values of this array are to scatter according to `mask` into final result.
+fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
+    let truthy = truthy.data();
+
+    let mut mutable = MutableArrayData::new(vec![&*truthy], true, mask.len());

Review comment:
       It's extending with `mutable.extend(0, true_pos, true_pos + len);` array from index 0 (the only truthy array), so the result is of the same type with truthy.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r835741197



##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
     fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+    /// Evaluate an expression against a RecordBatch with validity array
+    fn evaluate_selection(
+        &self,
+        batch: &RecordBatch,
+        selection: &BooleanArray,
+    ) -> Result<ColumnarValue> {
+        let mut indices = vec![];
+        for (i, b) in selection.iter().enumerate() {
+            if let Some(true) = b {
+                indices.push(i as u64);
+            }
+        }
+        let indices = UInt64Array::from_iter_values(indices);

Review comment:
       No, I think divide kernel works correctly to deal with only valid indices. 
   Are you suggesting I should create new RecordBatch by masking the current batch instead of the take-then-scatter way? Should I create bitmaps from existing ones for each array with the help of `arrow::bit_util`, or do I miss something handy?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#issuecomment-1079899070


   Thanks @doki23 for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] doki23 commented on a change in pull request #2068: Short-circuit evaluation for `CaseWhen`

Posted by GitBox <gi...@apache.org>.
doki23 commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r834894674



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -323,39 +323,46 @@ impl CaseExpr {
         let base_value = expr.evaluate(batch)?;
         let base_type = expr.data_type(&batch.schema())?;
         let base_value = base_value.into_array(batch.num_rows());
+        let base_nulls = is_null(base_value.as_ref())?;
 
-        // start with the else condition, or nulls
-        let mut current_value: Option<ArrayRef> = if let Some(e) = &self.else_expr {
-            // keep `else_expr`'s data type and return type consistent
-            let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone())
-                .unwrap_or_else(|_| e.clone());
-            Some(expr.evaluate(batch)?.into_array(batch.num_rows()))
-        } else {
-            Some(new_null_array(&return_type, batch.num_rows()))
-        };
-
-        // walk backwards through the when/then expressions
-        for i in (0..self.when_then_expr.len()).rev() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, batch.num_rows());
+        // We only consider non-null values while comparing with whens
+        let mut remainder = not(&base_nulls)?;
+        for i in 0..self.when_then_expr.len() {
             let i = i as usize;
 

Review comment:
       ```suggestion
   ```

##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -368,44 +375,54 @@ impl CaseExpr {
     fn case_when_no_expr(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let return_type = self.when_then_expr[0].1.data_type(&batch.schema())?;
 
-        // start with the else condition, or nulls
-        let mut current_value: Option<ArrayRef> = if let Some(e) = &self.else_expr {
-            let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone())
-                .unwrap_or_else(|_| e.clone());
-            Some(expr.evaluate(batch)?.into_array(batch.num_rows()))
-        } else {
-            Some(new_null_array(&return_type, batch.num_rows()))
-        };
-
-        // walk backwards through the when/then expressions
-        for i in (0..self.when_then_expr.len()).rev() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, batch.num_rows());
+        let mut remainder = BooleanArray::from(vec![true; batch.num_rows()]);
+        for i in 0..self.when_then_expr.len() {
             let i = i as usize;
 

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833077973



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       Also, we are evaluating `case_when` from the end to the beginning. This is problematic since **short-circuit evaluation** is adopted by most engines, including PostgreSQL, Oracle, and SQL Server. Therefore, chances are users would express computation logic that would fail in later `when_thens` for tuples that **should have been** computed and bypassed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2068: WIP: case when should only evaluate `then` clause for true `when`'s

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r833068149



##########
File path: datafusion-physical-expr/src/expressions/case.rs
##########
@@ -523,6 +528,39 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn case_with_expr_divide_by_zero() -> Result<()> {
+        let batch = case_test_batch1()?;
+        let schema = batch.schema();
+
+        // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64)  END
+        let when1 = lit(ScalarValue::Int32(Some(0)));

Review comment:
       This test would fail since we are currently evaluating `else` first for `CASE expr WHEN value THEN result ELSE result END`, regardless of whether the when branches are entered.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org