You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2020/06/23 19:09:56 UTC

[arrow] branch master updated: ARROW-9158: [Rust][Datafusion] projection physical plan compilation should preserve nullability

This is an automated email from the ASF dual-hosted git repository.

paddyhoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new cefc647  ARROW-9158: [Rust][Datafusion] projection physical plan compilation should preserve nullability
cefc647 is described below

commit cefc647dc1f0733b0299ccc04590d667fc3ac93b
Author: Qingping Hou <da...@gmail.com>
AuthorDate: Tue Jun 23 15:09:25 2020 -0400

    ARROW-9158: [Rust][Datafusion] projection physical plan compilation should preserve nullability
    
    Closes #7466 from houqp/ARROW-9158
    
    Authored-by: Qingping Hou <da...@gmail.com>
    Signed-off-by: Paddy Horan <pa...@hotmail.com>
---
 rust/datafusion/src/execution/context.rs           | 21 +++++++++++++++
 .../src/execution/physical_plan/expressions.rs     | 30 ++++++++++++++++++++++
 rust/datafusion/src/execution/physical_plan/mod.rs | 12 ++++++++-
 .../src/execution/physical_plan/projection.rs      |  4 +--
 rust/datafusion/src/execution/physical_plan/udf.rs |  4 +++
 5 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 502000e..d142b0d 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -763,6 +763,27 @@ mod tests {
     }
 
     #[test]
+    fn preserve_nullability_on_projection() -> Result<()> {
+        let tmp_dir = TempDir::new("execute")?;
+        let ctx = create_ctx(&tmp_dir, 1)?;
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "state",
+            DataType::Utf8,
+            false,
+        )]));
+
+        let plan = LogicalPlanBuilder::scan("default", "test", schema.as_ref(), None)?
+            .project(vec![col("state")])?
+            .build()?;
+
+        let plan = ctx.optimize(&plan)?;
+        let physical_plan = ctx.create_physical_plan(&Arc::new(plan), 1024)?;
+        assert_eq!(physical_plan.schema().field(0).is_nullable(), false);
+        Ok(())
+    }
+
+    #[test]
     fn projection_on_memory_scan() -> Result<()> {
         let schema = Schema::new(vec![
             Field::new("a", DataType::Int32, false),
diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs
index 73882fa..d26d050 100644
--- a/rust/datafusion/src/execution/physical_plan/expressions.rs
+++ b/rust/datafusion/src/execution/physical_plan/expressions.rs
@@ -72,6 +72,10 @@ impl PhysicalExpr for Alias {
         self.expr.data_type(input_schema)
     }
 
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.expr.nullable(input_schema)
+    }
+
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         self.expr.evaluate(batch)
     }
@@ -104,6 +108,11 @@ impl PhysicalExpr for Column {
         Ok(input_schema.field(self.index).data_type().clone())
     }
 
+    /// Decide whehter this expression is nullable, given the schema of the input
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        Ok(input_schema.field(self.index).is_nullable())
+    }
+
     /// Evaluate the expression
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         Ok(batch.column(self.index).clone())
@@ -1023,6 +1032,11 @@ impl PhysicalExpr for BinaryExpr {
         self.left.data_type(input_schema)
     }
 
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        // binary operator should always return a boolean value
+        Ok(false)
+    }
+
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         let left = self.left.evaluate(batch)?;
         let right = self.right.evaluate(batch)?;
@@ -1106,6 +1120,11 @@ impl PhysicalExpr for NotExpr {
         return Ok(DataType::Boolean);
     }
 
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        // !Null == true
+        Ok(false)
+    }
+
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         let arg = self.arg.evaluate(batch)?;
         if arg.data_type() != &DataType::Boolean {
@@ -1182,6 +1201,10 @@ impl PhysicalExpr for CastExpr {
         Ok(self.cast_type.clone())
     }
 
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.expr.nullable(input_schema)
+    }
+
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         let value = self.expr.evaluate(batch)?;
         Ok(cast(&value, &self.cast_type)?)
@@ -1221,6 +1244,13 @@ impl PhysicalExpr for Literal {
         Ok(self.value.get_datatype())
     }
 
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        match &self.value {
+            ScalarValue::Null => Ok(true),
+            _ => Ok(false),
+        }
+    }
+
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         match &self.value {
             ScalarValue::Int8(value) => build_literal_array!(batch, Int8Builder, *value),
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 92e425e..b7da5e1 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex};
 use crate::error::Result;
 use crate::logicalplan::ScalarValue;
 use arrow::array::ArrayRef;
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Field, Schema};
 use arrow::record_batch::RecordBatch;
 
 /// Partition-aware execution plan for a relation
@@ -55,8 +55,18 @@ pub trait PhysicalExpr: Send + Sync {
     fn name(&self) -> String;
     /// Get the data type of this expression, given the schema of the input
     fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
+    /// Decide whehter this expression is nullable, given the schema of the input
+    fn nullable(&self, input_schema: &Schema) -> Result<bool>;
     /// Evaluate an expression against a RecordBatch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
+    /// Generate schema Field type for this expression
+    fn to_schema_field(&self, input_schema: &Schema) -> Result<Field> {
+        Ok(Field::new(
+            &self.name(),
+            self.data_type(input_schema)?,
+            self.nullable(input_schema)?,
+        ))
+    }
 }
 
 /// Agggregate expression that can be evaluated against a RecordBatch
diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs
index a0c588d..f1d7785 100644
--- a/rust/datafusion/src/execution/physical_plan/projection.rs
+++ b/rust/datafusion/src/execution/physical_plan/projection.rs
@@ -26,7 +26,7 @@ use crate::error::Result;
 use crate::execution::physical_plan::{
     BatchIterator, ExecutionPlan, Partition, PhysicalExpr,
 };
-use arrow::datatypes::{Field, Schema};
+use arrow::datatypes::Schema;
 use arrow::record_batch::RecordBatch;
 
 /// Execution plan for a projection
@@ -49,7 +49,7 @@ impl ProjectionExec {
 
         let fields: Result<Vec<_>> = expr
             .iter()
-            .map(|e| Ok(Field::new(&e.name(), e.data_type(&input_schema)?, true)))
+            .map(|e| e.to_schema_field(&input_schema))
             .collect();
 
         let schema = Arc::new(Schema::new(fields?));
diff --git a/rust/datafusion/src/execution/physical_plan/udf.rs b/rust/datafusion/src/execution/physical_plan/udf.rs
index 1435e2d..9f888fa 100644
--- a/rust/datafusion/src/execution/physical_plan/udf.rs
+++ b/rust/datafusion/src/execution/physical_plan/udf.rs
@@ -93,6 +93,10 @@ impl PhysicalExpr for ScalarFunctionExpr {
         Ok(self.return_type.clone())
     }
 
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        Ok(true)
+    }
+
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         // evaluate the arguments
         let inputs = self