You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/01 11:59:52 UTC

[GitHub] [arrow] alamb commented on a change in pull request #9038: ARROW-10356: [Rust][DataFusion] Add support for is_in (WIP)

alamb commented on a change in pull request #9038:
URL: https://github.com/apache/arrow/pull/9038#discussion_r550760379



##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -2414,6 +2417,212 @@ impl PhysicalSortExpr {
     }
 }
 
+/// InList
+#[derive(Debug)]
+pub struct InListExpr {
+    expr: Arc<dyn PhysicalExpr>,
+    list: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+macro_rules! make_contains {
+    ($ARRAY:expr, $LIST_VALUES:expr, $SCALAR_VALUE:ident, $ARRAY_TYPE:ident) => {{
+        let array = $ARRAY.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+
+        let values = $LIST_VALUES
+            .iter()
+            .map(|expr| match expr {
+                ColumnarValue::Scalar(s) => match s {
+                    ScalarValue::$SCALAR_VALUE(Some(v)) => v,
+                    datatype => unimplemented!("Unexpected type {} for InList", datatype),
+                },
+                ColumnarValue::Array(_) => {
+                    unimplemented!("InList should not receive Array")
+                }
+            })
+            .collect::<Vec<_>>();
+
+        Ok(ColumnarValue::Array(Arc::new(
+            array
+                .iter()
+                .map(|x| x.map(|x| values.contains(&&x)))
+                .collect::<BooleanArray>(),
+        )))
+    }};
+}
+
+impl InListExpr {
+    /// Create a new InList expression
+    pub fn new(expr: Arc<dyn PhysicalExpr>, list: Vec<Arc<dyn PhysicalExpr>>) -> Self {
+        Self { expr, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: StringOffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list_values: Vec<ColumnarValue>,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        let values = list_values
+            .iter()
+            .map(|expr| match expr {
+                ColumnarValue::Scalar(s) => match s {
+                    ScalarValue::Utf8(Some(v)) => v.as_str(),
+                    datatype => unimplemented!("Unexpected type {} for InList", datatype),
+                },
+                ColumnarValue::Array(_) => {
+                    unimplemented!("InList should not receive Array")

Review comment:
       this should probably generate an error earlier in planing too (e.g. if you see an expression like `my_col IN (my_other_col, 'foo')` )

##########
File path: rust/datafusion/src/logical_plan/expr.rs
##########
@@ -158,6 +158,15 @@ pub enum Expr {
         /// List of expressions to feed to the functions as arguments
         args: Vec<Expr>,
     },
+    /// Returns whether the list contains the expr value.
+    InList {
+        /// The value to compare
+        expr: Box<Expr>,
+        /// The low end of the range
+        list: Vec<Expr>,

Review comment:
       I think the rationale / idea (largely expressed by @jorgecarleitao ) was that actual type coercion happens during physical planning (so that we could potentially have different backend physical planning mechanisms but the same logical mechanisms).
   
   You could potentially use the coercion logic here: https://github.com/apache/arrow/blob/master/rust/datafusion/src/physical_plan/type_coercion.rs#L118
   
   And coerce the in list items all to the same types 

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -2414,6 +2417,212 @@ impl PhysicalSortExpr {
     }
 }
 
+/// InList
+#[derive(Debug)]
+pub struct InListExpr {
+    expr: Arc<dyn PhysicalExpr>,
+    list: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+macro_rules! make_contains {
+    ($ARRAY:expr, $LIST_VALUES:expr, $SCALAR_VALUE:ident, $ARRAY_TYPE:ident) => {{
+        let array = $ARRAY.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+
+        let values = $LIST_VALUES
+            .iter()
+            .map(|expr| match expr {
+                ColumnarValue::Scalar(s) => match s {
+                    ScalarValue::$SCALAR_VALUE(Some(v)) => v,
+                    datatype => unimplemented!("Unexpected type {} for InList", datatype),
+                },
+                ColumnarValue::Array(_) => {
+                    unimplemented!("InList should not receive Array")
+                }
+            })
+            .collect::<Vec<_>>();
+
+        Ok(ColumnarValue::Array(Arc::new(
+            array
+                .iter()
+                .map(|x| x.map(|x| values.contains(&&x)))

Review comment:
       I wonder if this handles `NULL` correctly -- like for a value of where `expr` is NULL the output should be NULL (not true/false). The semantics when there is a literal `NULL` in the inlist are even stranger (but likely could be handled as a follow on PR)
   
   For example:
   
   sqlite> create table t(c1 int);
   sqlite> insert into t values (10);
   sqlite> insert into t values (20);
   sqlite> insert into t values(NULL);
   sqlite> select c1, c1 IN (20, NULL) from t;
   10|
   20|1
   |
   sqlite> select c1, c1 IN (20) from t;
   10|0
   20|1
   |
   ```
   
   Note that `10 IN (20, NULL)` is actually `NULL` rather than `FALSE`. Crazy
   

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -2414,6 +2417,129 @@ impl PhysicalSortExpr {
     }
 }
 
+/// Not expression
+#[derive(Debug)]
+pub struct InListExpr {
+    expr: Arc<dyn PhysicalExpr>,
+    list: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+impl InListExpr {
+    /// Create a new InList function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, list: Vec<Arc<dyn PhysicalExpr>>) -> Self {
+        Self { expr, list }
+    }
+
+    fn compare_utf8(
+        &self,
+        array: Arc<dyn Array>,
+        list_values: Vec<ColumnarValue>,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<i32>>()
+            .unwrap();
+
+        // create the ListArray expected by comparison::contains_utf8
+        let values_builder = StringBuilder::new(list_values.len());
+        let mut builder = ListBuilder::new(values_builder);
+        for _ in 0..array.len() {
+            list_values.iter().for_each(|expr| match expr {
+                ColumnarValue::Scalar(s) => match s {
+                    ScalarValue::Utf8(Some(v)) => {
+                        builder.values().append_value(v).unwrap();
+                    }
+                    _ => unimplemented!("not yet implemented"),
+                },
+                ColumnarValue::Array(_) => {
+                    unimplemented!("InList should not receive Array")
+                }
+            });
+            builder.append(true).unwrap();
+        }
+        let list_array = builder.finish();
+
+        Ok(ColumnarValue::Array(Arc::new(
+            kernels::comparison::contains_utf8(array, &list_array)
+                .map_err(DataFusionError::ArrowError)?,
+        )))
+    }
+}
+
+impl fmt::Display for InListExpr {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "{} IN ({:?})", self.expr, self.list)
+    }
+}
+
+impl PhysicalExpr for InListExpr {
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.expr.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = self.expr.evaluate(batch)?;
+        let value_data_type = value.data_type();
+
+        let list_values = self
+            .list
+            .iter()
+            .map(|expr| expr.evaluate(batch))
+            .collect::<Result<Vec<_>>>()?;
+        let list_values_data_types = list_values
+            .iter()
+            .map(|expr| expr.data_type())
+            .collect::<Vec<DataType>>();
+
+        if list_values_data_types
+            .iter()
+            .any(|dt| *dt != value_data_type)
+        {
+            return Err(DataFusionError::Internal(format!(

Review comment:
       I suggest that the appropriate place would be to "coerce" all the in list  item types to the same data type during Logical --> Physical plan creation. 

##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -761,6 +761,28 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 high: Box::new(self.sql_expr_to_logical_expr(&high)?),
             }),
 
+            SQLExpr::InList {
+                ref expr,
+                ref list,
+                ref negated,
+            } => {
+                let list_expr = list
+                    .iter()
+                    .map(|e| self.sql_expr_to_logical_expr(e))

Review comment:
       Here is where I think you could add the type coercion / checking logic

##########
File path: rust/datafusion/tests/sql.rs
##########
@@ -1849,3 +1849,45 @@ async fn string_expressions() -> Result<()> {
     assert_eq!(expected, actual);
     Ok(())
 }
+
+#[tokio::test]

Review comment:
       Since there is also support in this PR for numeric types, I would also suggest some basic tests for IN lists with numbers as well (e.g. `c1 IN (1, 2 3)` as well as `c1 IN (1, NULL)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org