You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2020/06/23 19:09:56 UTC
[arrow] branch master updated: ARROW-9158: [Rust][Datafusion]
projection physical plan compilation should preserve nullability
This is an automated email from the ASF dual-hosted git repository.
paddyhoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new cefc647 ARROW-9158: [Rust][Datafusion] projection physical plan compilation should preserve nullability
cefc647 is described below
commit cefc647dc1f0733b0299ccc04590d667fc3ac93b
Author: Qingping Hou <da...@gmail.com>
AuthorDate: Tue Jun 23 15:09:25 2020 -0400
ARROW-9158: [Rust][Datafusion] projection physical plan compilation should preserve nullability
Closes #7466 from houqp/ARROW-9158
Authored-by: Qingping Hou <da...@gmail.com>
Signed-off-by: Paddy Horan <pa...@hotmail.com>
---
rust/datafusion/src/execution/context.rs | 21 +++++++++++++++
.../src/execution/physical_plan/expressions.rs | 30 ++++++++++++++++++++++
rust/datafusion/src/execution/physical_plan/mod.rs | 12 ++++++++-
.../src/execution/physical_plan/projection.rs | 4 +--
rust/datafusion/src/execution/physical_plan/udf.rs | 4 +++
5 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 502000e..d142b0d 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -763,6 +763,27 @@ mod tests {
}
#[test]
+ fn preserve_nullability_on_projection() -> Result<()> {
+ let tmp_dir = TempDir::new("execute")?;
+ let ctx = create_ctx(&tmp_dir, 1)?;
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "state",
+ DataType::Utf8,
+ false,
+ )]));
+
+ let plan = LogicalPlanBuilder::scan("default", "test", schema.as_ref(), None)?
+ .project(vec![col("state")])?
+ .build()?;
+
+ let plan = ctx.optimize(&plan)?;
+ let physical_plan = ctx.create_physical_plan(&Arc::new(plan), 1024)?;
+ assert_eq!(physical_plan.schema().field(0).is_nullable(), false);
+ Ok(())
+ }
+
+ #[test]
fn projection_on_memory_scan() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs
index 73882fa..d26d050 100644
--- a/rust/datafusion/src/execution/physical_plan/expressions.rs
+++ b/rust/datafusion/src/execution/physical_plan/expressions.rs
@@ -72,6 +72,10 @@ impl PhysicalExpr for Alias {
self.expr.data_type(input_schema)
}
+ fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+ self.expr.nullable(input_schema)
+ }
+
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
self.expr.evaluate(batch)
}
@@ -104,6 +108,11 @@ impl PhysicalExpr for Column {
Ok(input_schema.field(self.index).data_type().clone())
}
+ /// Decide whehter this expression is nullable, given the schema of the input
+ fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+ Ok(input_schema.field(self.index).is_nullable())
+ }
+
/// Evaluate the expression
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
Ok(batch.column(self.index).clone())
@@ -1023,6 +1032,11 @@ impl PhysicalExpr for BinaryExpr {
self.left.data_type(input_schema)
}
+ fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+ // binary operator should always return a boolean value
+ Ok(false)
+ }
+
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let left = self.left.evaluate(batch)?;
let right = self.right.evaluate(batch)?;
@@ -1106,6 +1120,11 @@ impl PhysicalExpr for NotExpr {
return Ok(DataType::Boolean);
}
+ fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+ // !Null == true
+ Ok(false)
+ }
+
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let arg = self.arg.evaluate(batch)?;
if arg.data_type() != &DataType::Boolean {
@@ -1182,6 +1201,10 @@ impl PhysicalExpr for CastExpr {
Ok(self.cast_type.clone())
}
+ fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+ self.expr.nullable(input_schema)
+ }
+
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let value = self.expr.evaluate(batch)?;
Ok(cast(&value, &self.cast_type)?)
@@ -1221,6 +1244,13 @@ impl PhysicalExpr for Literal {
Ok(self.value.get_datatype())
}
+ fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+ match &self.value {
+ ScalarValue::Null => Ok(true),
+ _ => Ok(false),
+ }
+ }
+
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
match &self.value {
ScalarValue::Int8(value) => build_literal_array!(batch, Int8Builder, *value),
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 92e425e..b7da5e1 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex};
use crate::error::Result;
use crate::logicalplan::ScalarValue;
use arrow::array::ArrayRef;
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
/// Partition-aware execution plan for a relation
@@ -55,8 +55,18 @@ pub trait PhysicalExpr: Send + Sync {
fn name(&self) -> String;
/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
+ /// Decide whehter this expression is nullable, given the schema of the input
+ fn nullable(&self, input_schema: &Schema) -> Result<bool>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
+ /// Generate schema Field type for this expression
+ fn to_schema_field(&self, input_schema: &Schema) -> Result<Field> {
+ Ok(Field::new(
+ &self.name(),
+ self.data_type(input_schema)?,
+ self.nullable(input_schema)?,
+ ))
+ }
}
/// Agggregate expression that can be evaluated against a RecordBatch
diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs
index a0c588d..f1d7785 100644
--- a/rust/datafusion/src/execution/physical_plan/projection.rs
+++ b/rust/datafusion/src/execution/physical_plan/projection.rs
@@ -26,7 +26,7 @@ use crate::error::Result;
use crate::execution::physical_plan::{
BatchIterator, ExecutionPlan, Partition, PhysicalExpr,
};
-use arrow::datatypes::{Field, Schema};
+use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
/// Execution plan for a projection
@@ -49,7 +49,7 @@ impl ProjectionExec {
let fields: Result<Vec<_>> = expr
.iter()
- .map(|e| Ok(Field::new(&e.name(), e.data_type(&input_schema)?, true)))
+ .map(|e| e.to_schema_field(&input_schema))
.collect();
let schema = Arc::new(Schema::new(fields?));
diff --git a/rust/datafusion/src/execution/physical_plan/udf.rs b/rust/datafusion/src/execution/physical_plan/udf.rs
index 1435e2d..9f888fa 100644
--- a/rust/datafusion/src/execution/physical_plan/udf.rs
+++ b/rust/datafusion/src/execution/physical_plan/udf.rs
@@ -93,6 +93,10 @@ impl PhysicalExpr for ScalarFunctionExpr {
Ok(self.return_type.clone())
}
+ fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+ Ok(true)
+ }
+
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
// evaluate the arguments
let inputs = self