You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/16 16:45:01 UTC

[GitHub] [arrow-datafusion] ovr opened a new pull request, #2549: feat: Initial support for AnyExpression

ovr opened a new pull request, #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549

   # Which issue does this PR close?
   
   https://github.com/apache/arrow-datafusion/issues/2548
   
   Closes #.
   
    # Rationale for this change
   
   This PR implements partial support for `ANY` operator, only for `=` & `<>`.
   
   # Are there any user-facing changes?
   
   This PR doesn't introduce any breaking changes, only new functionaly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1382717546

   This PR is more than 6 month old, so closing it down for now to clean up the PR list. Please reopen if this is a mistake and you plan to work on it more 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ovr commented on pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
ovr commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1145082922

   Rebased ✅ cC @alamb @tustvold


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1145471022

   I reviewed the logical plan, SQL parsing, and optimizer changes and LGTM. I did not review the execution part.
   
   It might make reviews easier for future work like this to have separate PRs for logical versus physical parts. I would have been more comfortable reviewing and approving the logical plan parts of this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2549: feat: Initial support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1129257522

   > @alamb @andygrove is it required to add a new expression for Ballista? I did that in [218a917](https://github.com/apache/arrow-datafusion/pull/2549/commits/218a9177cd625990387ca183995b89bd489c5f5a), but I don't use it. Thanks
   
   I assume you had to add that to fix compilation issues because ballista has exhaustive matching of all expressions? The changes look reasonable to me. 
   
   Note that we also have a `datafusion.proto` (in fact, we have two copies, as noted in https://github.com/apache/arrow-datafusion/issues/2514) that we should ideally also add new expressions to (but we have not been doing this consistently).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ovr commented on a diff in pull request #2549: feat: Initial support for AnyExpression

Posted by GitBox <gi...@apache.org>.
ovr commented on code in PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#discussion_r875250090


##########
datafusion/physical-expr/src/expressions/any.rs:
##########
@@ -0,0 +1,570 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Any expression
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
+    PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::ArrowPrimitiveType;
+use arrow::{
+    datatypes::{DataType, Schema},
+    record_batch::RecordBatch,
+};
+
+use crate::expressions::try_cast;
+use crate::PhysicalExpr;
+use arrow::array::*;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{ColumnarValue, Operator};
+
+macro_rules! compare_op_scalar {
+    ($LEFT: expr, $LIST_VALUES:expr, $OP:expr, $LIST_VALUES_TYPE:ty, $LIST_FROM_SCALAR: expr) => {{
+        let mut builder = BooleanBuilder::new($LEFT.len());
+
+        if $LIST_FROM_SCALAR {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(0) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(0)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        } else {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(i) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(i)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish())
+    }};
+}
+
+macro_rules! make_primitive {
+    ($VALUES:expr, $IN_VALUES:expr, $NEGATED:expr, $TYPE:ident, $LIST_FROM_SCALAR: expr) => {{
+        let left = $VALUES.as_any().downcast_ref::<$TYPE>().expect(&format!(
+            "Unable to downcast values to {}",
+            stringify!($TYPE)
+        ));
+
+        if $NEGATED {
+            Ok(ColumnarValue::Array(Arc::new(neq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        }
+    }};
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn neq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| !v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn eq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn neq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn eq_utf8<OffsetSize: StringOffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+fn neq_utf8<OffsetSize: StringOffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+/// AnyExpr
+#[derive(Debug)]
+pub struct AnyExpr {
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+}
+
+impl AnyExpr {
+    /// Create a new InList expression
+    pub fn new(
+        value: Arc<dyn PhysicalExpr>,
+        op: Operator,
+        list: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self { value, op, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: StringOffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+
+    /// Get the left side of the binary expression
+    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.value
+    }
+
+    /// Get the right side of the binary expression
+    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.list
+    }
+
+    /// Get the operator for this binary expression
+    pub fn op(&self) -> &Operator {
+        &self.op
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_bool(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+}
+
+impl std::fmt::Display for AnyExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} ANY({})", self.value, self.op, self.list)
+    }
+}
+
+impl PhysicalExpr for AnyExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.value.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = match self.value.evaluate(batch)? {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        };
+
+        let (list, list_from_scalar) = match self.list.evaluate(batch)? {
+            ColumnarValue::Array(array) => (array, false),
+            ColumnarValue::Scalar(scalar) => (scalar.to_array(), true),
+        };
+        let as_list = list
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .expect("Unable to downcast list to ListArray");
+
+        let negated = match self.op {
+            Operator::Eq => false,
+            Operator::NotEq => true,
+            op => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Operator for ANY expression, actual: {:?}",
+                    op
+                )));
+            }
+        };
+
+        match value.data_type() {
+            DataType::Float16 => {
+                make_primitive!(value, as_list, negated, Float16Array, list_from_scalar)
+            }
+            DataType::Float32 => {
+                make_primitive!(value, as_list, negated, Float32Array, list_from_scalar)
+            }
+            DataType::Float64 => {
+                make_primitive!(value, as_list, negated, Float64Array, list_from_scalar)
+            }
+            DataType::Int8 => {
+                make_primitive!(value, as_list, negated, Int8Array, list_from_scalar)
+            }
+            DataType::Int16 => {
+                make_primitive!(value, as_list, negated, Int16Array, list_from_scalar)
+            }
+            DataType::Int32 => {
+                make_primitive!(value, as_list, negated, Int32Array, list_from_scalar)
+            }
+            DataType::Int64 => {
+                make_primitive!(value, as_list, negated, Int64Array, list_from_scalar)
+            }
+            DataType::UInt8 => {
+                make_primitive!(value, as_list, negated, UInt8Array, list_from_scalar)
+            }
+            DataType::UInt16 => {
+                make_primitive!(value, as_list, negated, UInt16Array, list_from_scalar)
+            }
+            DataType::UInt32 => {
+                make_primitive!(value, as_list, negated, UInt32Array, list_from_scalar)
+            }
+            DataType::UInt64 => {
+                make_primitive!(value, as_list, negated, UInt64Array, list_from_scalar)
+            }
+            DataType::Boolean => {
+                self.compare_bool(value, as_list, negated, list_from_scalar)
+            }
+            DataType::Utf8 => {
+                self.compare_utf8::<i32>(value, as_list, negated, list_from_scalar)
+            }
+            DataType::LargeUtf8 => {
+                self.compare_utf8::<i64>(value, as_list, negated, list_from_scalar)
+            }
+            datatype => Result::Err(DataFusionError::NotImplemented(format!(
+                "AnyExpr does not support datatype {:?}.",
+                datatype
+            ))),
+        }
+    }
+}
+
+/// return two physical expressions that are optionally coerced to a
+/// common type that the binary operator supports.
+fn any_cast(
+    value: Arc<dyn PhysicalExpr>,
+    _op: &Operator,
+    list: Arc<dyn PhysicalExpr>,
+    input_schema: &Schema,
+) -> Result<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> {
+    let tmp = list.data_type(input_schema)?;
+    let list_type = match &tmp {
+        DataType::List(f) => f.data_type(),
+        _ => panic!("wtf"),

Review Comment:
   Oops! My bad :D Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ovr commented on pull request #2549: feat: Initial support for AnyExpression

Posted by GitBox <gi...@apache.org>.
ovr commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1128000859

   @alamb @andygrove is it required to add a new expression for Ballista? I did that in [d5dfa12](https://github.com/apache/arrow-datafusion/pull/2549/commits/d5dfa12a38b11963d019acf865d441a8d0e49697), but I don't use it. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2549: feat: Initial support for AnyExpression

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1129774517

   I plan to review this carefully today -- thanks @ovr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#discussion_r875910788


##########
datafusion/physical-expr/src/expressions/any.rs:
##########
@@ -0,0 +1,674 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Any expression
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
+    PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::ArrowPrimitiveType;
+use arrow::{
+    datatypes::{DataType, Schema},
+    record_batch::RecordBatch,
+};
+
+use crate::expressions::try_cast;
+use crate::PhysicalExpr;
+use arrow::array::*;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{ColumnarValue, Operator};
+
+macro_rules! compare_op_scalar {
+    ($LEFT: expr, $LIST_VALUES:expr, $OP:expr, $LIST_VALUES_TYPE:ty, $LIST_FROM_SCALAR: expr) => {{
+        let mut builder = BooleanBuilder::new($LEFT.len());
+
+        if $LIST_FROM_SCALAR {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(0) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(0)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        } else {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(i) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(i)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish())
+    }};
+}
+
+macro_rules! make_primitive {
+    ($VALUES:expr, $IN_VALUES:expr, $NEGATED:expr, $TYPE:ident, $LIST_FROM_SCALAR: expr) => {{
+        let left = $VALUES.as_any().downcast_ref::<$TYPE>().expect(&format!(
+            "Unable to downcast values to {}",
+            stringify!($TYPE)
+        ));
+
+        if $NEGATED {
+            Ok(ColumnarValue::Array(Arc::new(neq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        }
+    }};
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn neq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| !v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn eq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn neq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn eq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+fn neq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+/// AnyExpr
+#[derive(Debug)]
+pub struct AnyExpr {
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+}
+
+impl AnyExpr {
+    /// Create a new Any expression
+    pub fn new(
+        value: Arc<dyn PhysicalExpr>,
+        op: Operator,
+        list: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self { value, op, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: OffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+
+    /// Get the left side of the binary expression
+    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.value
+    }
+
+    /// Get the right side of the binary expression
+    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.list
+    }
+
+    /// Get the operator for this binary expression
+    pub fn op(&self) -> &Operator {
+        &self.op
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_bool(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+}
+
+impl std::fmt::Display for AnyExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} ANY({})", self.value, self.op, self.list)
+    }
+}
+
+impl PhysicalExpr for AnyExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.value.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = match self.value.evaluate(batch)? {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        };

Review Comment:
   Thank you @ovr  -- this is looking quite cool.
   
   I haven't reviewed this code super carefully but I wonder what you think about reusing more of the code for `InList`?
   
   https://github.com/cube-js/arrow-datafusion/blob/binary-df-pr/datafusion/physical-expr/src/expressions/in_list.rs#L439
   
   It seems like the implementations of `x IN (<expr>, <expr>)` and `x  ANY (<expr>)` are almost exactly the same except for the fact that the the `x = ANY(<expr>)` has the list as a single `<expr>` ?
   
   The implementation of `get_set` needs to be different, but otherwise the rest of the code could probably be called directly.
   
   I mention this as it is likely the `IN` / `NOT IN` are more optimized (use a hashset for testing, for example) as well as already written. 
   
   Let me know what you think 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#discussion_r888917283


##########
datafusion/expr/src/expr.rs:
##########
@@ -99,6 +99,15 @@ pub enum Expr {
         /// Right-hand side of the expression
         right: Box<Expr>,
     },
+    /// A binary expression such as "age > 21"
+    AnyExpr {
+        /// Left-hand side of the expression
+        left: Box<Expr>,
+        /// The comparison operator
+        op: Operator,
+        /// Right-hand side of the expression
+        right: Box<Expr>,

Review Comment:
   The definition of `AnyExpr` here is identical to the definition of `BinaryExpr`. I wonder if it would be better to use `BinaryExpr` for consistency and model Any as `AnyExpr(Expr)` to just model the right-hand side?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2549: feat: Initial support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1127913221

   I added the `api change` label since this introduces a new `AnyExpr` variant in the `Expr` enum, which is a breaking change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1132855695

   Thanks -- I'll try and find some more time to review this PR.
   
   On Wed, May 18, 2022 at 5:54 PM Dmitry Patsura ***@***.***>
   wrote:
   
   > ***@***.**** commented on this pull request.
   > ------------------------------
   >
   > In datafusion/physical-expr/src/expressions/any.rs
   > <https://github.com/apache/arrow-datafusion/pull/2549#discussion_r876413237>
   > :
   >
   > > +    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
   >
   > +        let value = match self.value.evaluate(batch)? {
   >
   > +            ColumnarValue::Array(array) => array,
   >
   > +            ColumnarValue::Scalar(scalar) => scalar.to_array(),
   >
   > +        };
   >
   >
   > I haven't reviewed this code super carefully but I wonder what you think
   > about reusing more of the code for InList?
   >
   > Probably, the nearest operator will be ALL, but it's not implemented
   > right now. I tried to reuse the code of InList, but come to the decision
   > that it will be easier/correct to combine it with ANY in the future.
   >
   > It seems like the implementations of x IN (, ) and x ANY () are almost
   > exactly the same except for the fact that the the x = ANY() has the list as
   > a single ?
   >
   > Nope, because it supports more operators: <, <=, >, >= (it's not
   > implemented in this PR, but anyway).
   >
   > I mention this as it is likely the IN / NOT IN are more optimized (use a
   > hashset for testing, for example) as well as already written.
   >
   > Yes, but IN supports only the vector of scalars in DF. It's not the same
   > as ANY, because it supports List (real column) or scalar.
   >
   > For example SELECT left, values FROM table:
   >
   > left | right LIst(Int64) - another logic is required
   >
   > 1.    | [1,2,3] - ArrayRef = PrimitiveArray<Int64> - another logic is requied
   >
   > 2.   |. [2,3,4]
   >
   > 3.   |. [4,5,6]
   >
   > 4.   |  [4,5,6]
   >
   > 5.   |. [4,5,6]
   >
   >
   > In the case of using the whole column, it is required to use another
   > handling of interacting on values ☝️ . Probably here, will be more
   > correct to go by steps:
   >
   >    - Support more operators
   >    - Introduce ALL operator + extract the same logic with ANY.
   >
   > Thanks
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow-datafusion/pull/2549#discussion_r876413237>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AADXZMLLPQHWVVSZ2KIMKULVKVRKVANCNFSM5WCC6J2Q>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1162451730

   Hi @ovr What do you think about breaking this up into smaller PRs and starting with the logical plan changes and SQL parsing? I think this might make it easier to start getting parts of this merged in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#discussion_r888177353


##########
datafusion/core/tests/sql/expr.rs:
##########
@@ -955,6 +955,30 @@ async fn test_extract_date_part() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn test_binary_any() -> Result<()> {

Review Comment:
   Is it worth testing what happens if `NULL` is in the list?



##########
datafusion/physical-expr/src/expressions/any.rs:
##########
@@ -0,0 +1,674 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Any expression
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
+    PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::ArrowPrimitiveType;
+use arrow::{
+    datatypes::{DataType, Schema},
+    record_batch::RecordBatch,
+};
+
+use crate::expressions::try_cast;
+use crate::PhysicalExpr;
+use arrow::array::*;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{ColumnarValue, Operator};
+
+macro_rules! compare_op_scalar {
+    ($LEFT: expr, $LIST_VALUES:expr, $OP:expr, $LIST_VALUES_TYPE:ty, $LIST_FROM_SCALAR: expr) => {{
+        let mut builder = BooleanBuilder::new($LEFT.len());
+
+        if $LIST_FROM_SCALAR {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(0) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(0)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        } else {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(i) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(i)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish())
+    }};
+}
+
+macro_rules! make_primitive {
+    ($VALUES:expr, $IN_VALUES:expr, $NEGATED:expr, $TYPE:ident, $LIST_FROM_SCALAR: expr) => {{
+        let left = $VALUES.as_any().downcast_ref::<$TYPE>().expect(&format!(
+            "Unable to downcast values to {}",
+            stringify!($TYPE)
+        ));
+
+        if $NEGATED {
+            Ok(ColumnarValue::Array(Arc::new(neq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        }
+    }};
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn neq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| !v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn eq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn neq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn eq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+fn neq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+/// AnyExpr
+#[derive(Debug)]
+pub struct AnyExpr {
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+}
+
+impl AnyExpr {
+    /// Create a new Any expression
+    pub fn new(
+        value: Arc<dyn PhysicalExpr>,
+        op: Operator,
+        list: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self { value, op, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: OffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+
+    /// Get the left side of the binary expression
+    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.value
+    }
+
+    /// Get the right side of the binary expression
+    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.list
+    }
+
+    /// Get the operator for this binary expression
+    pub fn op(&self) -> &Operator {
+        &self.op
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_bool(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+}
+
+impl std::fmt::Display for AnyExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} ANY({})", self.value, self.op, self.list)
+    }
+}
+
+impl PhysicalExpr for AnyExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.value.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = match self.value.evaluate(batch)? {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        };
+
+        let (list, list_from_scalar) = match self.list.evaluate(batch)? {
+            ColumnarValue::Array(array) => (array, false),
+            ColumnarValue::Scalar(scalar) => (scalar.to_array(), true),
+        };
+        let as_list = list

Review Comment:
   I'm probably missing something fundamental, but I'm confused as to why this is a ListArray, i.e. 2-dimensional.
   
   In all the examples given the expression is `[1,2,3]` which is 1 dimensional?



##########
datafusion/physical-expr/src/expressions/any.rs:
##########
@@ -0,0 +1,674 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Any expression
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
+    PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::ArrowPrimitiveType;
+use arrow::{
+    datatypes::{DataType, Schema},
+    record_batch::RecordBatch,
+};
+
+use crate::expressions::try_cast;
+use crate::PhysicalExpr;
+use arrow::array::*;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{ColumnarValue, Operator};
+
+macro_rules! compare_op_scalar {
+    ($LEFT: expr, $LIST_VALUES:expr, $OP:expr, $LIST_VALUES_TYPE:ty, $LIST_FROM_SCALAR: expr) => {{
+        let mut builder = BooleanBuilder::new($LEFT.len());
+
+        if $LIST_FROM_SCALAR {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(0) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(0)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        } else {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(i) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(i)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish())
+    }};
+}
+
+macro_rules! make_primitive {
+    ($VALUES:expr, $IN_VALUES:expr, $NEGATED:expr, $TYPE:ident, $LIST_FROM_SCALAR: expr) => {{
+        let left = $VALUES.as_any().downcast_ref::<$TYPE>().expect(&format!(
+            "Unable to downcast values to {}",
+            stringify!($TYPE)
+        ));
+
+        if $NEGATED {
+            Ok(ColumnarValue::Array(Arc::new(neq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        }
+    }};
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn neq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| !v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn eq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn neq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn eq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+fn neq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+/// AnyExpr
+#[derive(Debug)]
+pub struct AnyExpr {
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+}
+
+impl AnyExpr {
+    /// Create a new Any expression
+    pub fn new(
+        value: Arc<dyn PhysicalExpr>,
+        op: Operator,
+        list: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self { value, op, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: OffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+
+    /// Get the left side of the binary expression
+    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.value
+    }
+
+    /// Get the right side of the binary expression
+    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.list
+    }
+
+    /// Get the operator for this binary expression
+    pub fn op(&self) -> &Operator {
+        &self.op
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_bool(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+}
+
+impl std::fmt::Display for AnyExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} ANY({})", self.value, self.op, self.list)
+    }
+}
+
+impl PhysicalExpr for AnyExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.value.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = match self.value.evaluate(batch)? {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        };
+
+        let (list, list_from_scalar) = match self.list.evaluate(batch)? {
+            ColumnarValue::Array(array) => (array, false),
+            ColumnarValue::Scalar(scalar) => (scalar.to_array(), true),
+        };
+        let as_list = list
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .expect("Unable to downcast list to ListArray");
+
+        let negated = match self.op {
+            Operator::Eq => false,
+            Operator::NotEq => true,
+            op => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Operator for ANY expression, actual: {:?}",
+                    op
+                )));
+            }
+        };
+
+        match value.data_type() {
+            DataType::Float16 => {
+                make_primitive!(value, as_list, negated, Float16Array, list_from_scalar)
+            }
+            DataType::Float32 => {
+                make_primitive!(value, as_list, negated, Float32Array, list_from_scalar)
+            }
+            DataType::Float64 => {
+                make_primitive!(value, as_list, negated, Float64Array, list_from_scalar)
+            }
+            DataType::Int8 => {
+                make_primitive!(value, as_list, negated, Int8Array, list_from_scalar)
+            }
+            DataType::Int16 => {
+                make_primitive!(value, as_list, negated, Int16Array, list_from_scalar)
+            }
+            DataType::Int32 => {
+                make_primitive!(value, as_list, negated, Int32Array, list_from_scalar)
+            }
+            DataType::Int64 => {
+                make_primitive!(value, as_list, negated, Int64Array, list_from_scalar)
+            }
+            DataType::UInt8 => {
+                make_primitive!(value, as_list, negated, UInt8Array, list_from_scalar)
+            }
+            DataType::UInt16 => {
+                make_primitive!(value, as_list, negated, UInt16Array, list_from_scalar)
+            }
+            DataType::UInt32 => {
+                make_primitive!(value, as_list, negated, UInt32Array, list_from_scalar)
+            }
+            DataType::UInt64 => {
+                make_primitive!(value, as_list, negated, UInt64Array, list_from_scalar)
+            }
+            DataType::Boolean => {
+                self.compare_bool(value, as_list, negated, list_from_scalar)
+            }
+            DataType::Utf8 => {
+                self.compare_utf8::<i32>(value, as_list, negated, list_from_scalar)
+            }
+            DataType::LargeUtf8 => {
+                self.compare_utf8::<i64>(value, as_list, negated, list_from_scalar)
+            }
+            datatype => Result::Err(DataFusionError::NotImplemented(format!(
+                "AnyExpr does not support datatype {:?}.",
+                datatype
+            ))),
+        }
+    }
+}
+
+/// return two physical expressions that are optionally coerced to a
+/// common type that the binary operator supports.
+fn any_cast(
+    value: Arc<dyn PhysicalExpr>,
+    _op: &Operator,
+    list: Arc<dyn PhysicalExpr>,
+    input_schema: &Schema,
+) -> Result<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> {
+    let list_dt = list.data_type(input_schema)?;
+    let list_type = match &list_dt {
+        DataType::List(f) => f.data_type(),
+        dt => return Err(DataFusionError::Execution(format!(
+            "Unexpected type on the right side of ANY expression. Must be a List, actual: {}",
+            dt
+        ))),
+    };
+
+    Ok((try_cast(value, input_schema, list_type.clone())?, list))
+}
+
+/// Creates an expression AnyExpr
+pub fn any(
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+    input_schema: &Schema,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    let (l, r) = any_cast(value, &op, list, input_schema)?;
+    Ok(Arc::new(AnyExpr::new(l, op, r)))
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::datatypes::Field;
+
+    use super::*;
+    use crate::expressions::{col, lit};
+    use datafusion_common::{Result, ScalarValue};
+
+    // applies the any expr to an input batch
+    macro_rules! execute_any {
+        ($BATCH:expr, $OP:expr, $EXPECTED:expr, $COL_A:expr, $COL_B:expr, $SCHEMA:expr) => {{
+            let expr = any($COL_A, $OP, $COL_B, $SCHEMA).unwrap();
+            let result = expr.evaluate(&$BATCH)?.into_array($BATCH.num_rows());
+            let result = result
+                .as_any()
+                .downcast_ref::<BooleanArray>()
+                .expect("failed to downcast to BooleanArray");
+            let expected = &BooleanArray::from($EXPECTED);
+            assert_eq!(expected, result);
+        }};
+    }
+
+    #[test]
+    fn any_int64_array_list() -> Result<()> {
+        let field_a = Field::new("a", DataType::Int64, true);
+        let field_b = Field::new(
+            "b",
+            DataType::List(Box::new(Field::new("item", DataType::Int64, true))),
+            true,
+        );
+
+        let schema = Schema::new(vec![field_a, field_b]);
+        let a = Int64Array::from(vec![Some(0), Some(3), None]);
+        let col_a = col("a", &schema)?;
+
+        let values_builder = Int64Builder::new(3 * 3);
+        let mut builder = ListBuilder::new(values_builder);

Review Comment:
   Using `ListArray::from_iter_primitive` can be easier to read than the builders.



##########
datafusion/expr/src/expr.rs:
##########
@@ -99,6 +99,15 @@ pub enum Expr {
         /// Right-hand side of the expression
         right: Box<Expr>,
     },
+    /// A binary expression such as "age > 21"
+    AnyExpr {
+        /// Left-hand side of the expression
+        left: Box<Expr>,
+        /// The comparison operator
+        op: Operator,
+        /// Right-hand side of the expression
+        right: Box<Expr>,

Review Comment:
   I'm guessing supporting an arbitrary subquery here is a future extension?



##########
datafusion/physical-expr/src/expressions/any.rs:
##########
@@ -0,0 +1,674 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Any expression
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
+    PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::ArrowPrimitiveType;
+use arrow::{
+    datatypes::{DataType, Schema},
+    record_batch::RecordBatch,
+};
+
+use crate::expressions::try_cast;
+use crate::PhysicalExpr;
+use arrow::array::*;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{ColumnarValue, Operator};
+
+macro_rules! compare_op_scalar {
+    ($LEFT: expr, $LIST_VALUES:expr, $OP:expr, $LIST_VALUES_TYPE:ty, $LIST_FROM_SCALAR: expr) => {{
+        let mut builder = BooleanBuilder::new($LEFT.len());
+
+        if $LIST_FROM_SCALAR {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(0) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(0)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        } else {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(i) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(i)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish())
+    }};
+}
+
+macro_rules! make_primitive {
+    ($VALUES:expr, $IN_VALUES:expr, $NEGATED:expr, $TYPE:ident, $LIST_FROM_SCALAR: expr) => {{
+        let left = $VALUES.as_any().downcast_ref::<$TYPE>().expect(&format!(
+            "Unable to downcast values to {}",
+            stringify!($TYPE)
+        ));
+
+        if $NEGATED {
+            Ok(ColumnarValue::Array(Arc::new(neq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        }
+    }};
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn neq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| !v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn eq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn neq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn eq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+fn neq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+/// AnyExpr
+#[derive(Debug)]
+pub struct AnyExpr {
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+}
+
+impl AnyExpr {
+    /// Create a new Any expression
+    pub fn new(
+        value: Arc<dyn PhysicalExpr>,
+        op: Operator,
+        list: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self { value, op, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: OffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+
+    /// Get the left side of the binary expression
+    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.value
+    }
+
+    /// Get the right side of the binary expression
+    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.list
+    }
+
+    /// Get the operator for this binary expression
+    pub fn op(&self) -> &Operator {
+        &self.op
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_bool(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+}
+
+impl std::fmt::Display for AnyExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} ANY({})", self.value, self.op, self.list)
+    }
+}
+
+impl PhysicalExpr for AnyExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.value.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = match self.value.evaluate(batch)? {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        };
+
+        let (list, list_from_scalar) = match self.list.evaluate(batch)? {
+            ColumnarValue::Array(array) => (array, false),
+            ColumnarValue::Scalar(scalar) => (scalar.to_array(), true),
+        };
+        let as_list = list
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .expect("Unable to downcast list to ListArray");
+
+        let negated = match self.op {
+            Operator::Eq => false,
+            Operator::NotEq => true,
+            op => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Operator for ANY expression, actual: {:?}",
+                    op
+                )));
+            }
+        };
+
+        match value.data_type() {

Review Comment:
   Depending on if this needs to be using ListArray, this could possibly use the dyn comparison kernels in arrow to reduce the amount of code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#discussion_r888513961


##########
datafusion/expr/src/expr.rs:
##########
@@ -99,6 +99,15 @@ pub enum Expr {
         /// Right-hand side of the expression
         right: Box<Expr>,
     },
+    /// A binary expression such as "age > 21"
+    AnyExpr {
+        /// Left-hand side of the expression
+        left: Box<Expr>,
+        /// The comparison operator
+        op: Operator,
+        /// Right-hand side of the expression
+        right: Box<Expr>,

Review Comment:
   The `Expr` here could be an `Expr::ScalarSubquery` to cover the subquery case (assuming that we only need to support subqueries that select a single column?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1145069876

   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2549?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2549](https://codecov.io/gh/apache/arrow-datafusion/pull/2549?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2d32cb2) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/4b3eb1cc592c9ee306b8f60845632a66eb7eddba?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4b3eb1c) will **decrease** coverage by `0.09%`.
   > The diff coverage is `69.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #2549      +/-   ##
   ==========================================
   - Coverage   84.64%   84.54%   -0.10%     
   ==========================================
     Files         267      268       +1     
     Lines       46926    47239     +313     
   ==========================================
   + Hits        39719    39937     +218     
   - Misses       7207     7302      +95     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/2549?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/core/src/datasource/listing/helpers.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9kYXRhc291cmNlL2xpc3RpbmcvaGVscGVycy5ycw==) | `95.34% <ø> (ø)` | |
   | [...ion/core/src/optimizer/common\_subexpr\_eliminate.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9vcHRpbWl6ZXIvY29tbW9uX3N1YmV4cHJfZWxpbWluYXRlLnJz) | `87.81% <0.00%> (-0.74%)` | :arrow_down: |
   | [...afusion/core/src/optimizer/simplify\_expressions.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9vcHRpbWl6ZXIvc2ltcGxpZnlfZXhwcmVzc2lvbnMucnM=) | `81.90% <0.00%> (-0.10%)` | :arrow_down: |
   | [datafusion/core/src/optimizer/utils.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9vcHRpbWl6ZXIvdXRpbHMucnM=) | `31.77% <0.00%> (-0.49%)` | :arrow_down: |
   | [datafusion/expr/src/utils.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9leHByL3NyYy91dGlscy5ycw==) | `91.79% <ø> (ø)` | |
   | [datafusion/physical-expr/src/expressions/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9leHByZXNzaW9ucy9tb2QucnM=) | `100.00% <ø> (ø)` | |
   | [datafusion/proto/src/from\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9wcm90by9zcmMvZnJvbV9wcm90by5ycw==) | `33.94% <0.00%> (-0.16%)` | :arrow_down: |
   | [datafusion/proto/src/to\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9wcm90by9zcmMvdG9fcHJvdG8ucnM=) | `55.74% <0.00%> (-0.59%)` | :arrow_down: |
   | [datafusion/sql/src/utils.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcWwvc3JjL3V0aWxzLnJz) | `50.19% <0.00%> (-0.79%)` | :arrow_down: |
   | [datafusion/expr/src/expr\_schema.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9leHByL3NyYy9leHByX3NjaGVtYS5ycw==) | `66.88% <16.66%> (-2.04%)` | :arrow_down: |
   | ... and [11 more](https://codecov.io/gh/apache/arrow-datafusion/pull/2549/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2549?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2549?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4b3eb1c...2d32cb2](https://codecov.io/gh/apache/arrow-datafusion/pull/2549?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#issuecomment-1130245725

   @ovr could you merge latest from master into this branch to pick up https://github.com/apache/arrow-datafusion/pull/2567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2549: feat: Initial support for AnyExpression

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#discussion_r875208686


##########
datafusion/physical-expr/src/expressions/any.rs:
##########
@@ -0,0 +1,570 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Any expression
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
+    PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::ArrowPrimitiveType;
+use arrow::{
+    datatypes::{DataType, Schema},
+    record_batch::RecordBatch,
+};
+
+use crate::expressions::try_cast;
+use crate::PhysicalExpr;
+use arrow::array::*;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{ColumnarValue, Operator};
+
+macro_rules! compare_op_scalar {
+    ($LEFT: expr, $LIST_VALUES:expr, $OP:expr, $LIST_VALUES_TYPE:ty, $LIST_FROM_SCALAR: expr) => {{
+        let mut builder = BooleanBuilder::new($LEFT.len());
+
+        if $LIST_FROM_SCALAR {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(0) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(0)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        } else {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(i) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(i)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish())
+    }};
+}
+
+macro_rules! make_primitive {
+    ($VALUES:expr, $IN_VALUES:expr, $NEGATED:expr, $TYPE:ident, $LIST_FROM_SCALAR: expr) => {{
+        let left = $VALUES.as_any().downcast_ref::<$TYPE>().expect(&format!(
+            "Unable to downcast values to {}",
+            stringify!($TYPE)
+        ));
+
+        if $NEGATED {
+            Ok(ColumnarValue::Array(Arc::new(neq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        }
+    }};
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn neq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| !v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn eq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn neq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn eq_utf8<OffsetSize: StringOffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+fn neq_utf8<OffsetSize: StringOffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+/// AnyExpr
+#[derive(Debug)]
+pub struct AnyExpr {
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+}
+
+impl AnyExpr {
+    /// Create a new InList expression
+    pub fn new(
+        value: Arc<dyn PhysicalExpr>,
+        op: Operator,
+        list: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self { value, op, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: StringOffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+
+    /// Get the left side of the binary expression
+    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.value
+    }
+
+    /// Get the right side of the binary expression
+    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.list
+    }
+
+    /// Get the operator for this binary expression
+    pub fn op(&self) -> &Operator {
+        &self.op
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_bool(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+}
+
+impl std::fmt::Display for AnyExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} ANY({})", self.value, self.op, self.list)
+    }
+}
+
+impl PhysicalExpr for AnyExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.value.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = match self.value.evaluate(batch)? {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        };
+
+        let (list, list_from_scalar) = match self.list.evaluate(batch)? {
+            ColumnarValue::Array(array) => (array, false),
+            ColumnarValue::Scalar(scalar) => (scalar.to_array(), true),
+        };
+        let as_list = list
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .expect("Unable to downcast list to ListArray");
+
+        let negated = match self.op {
+            Operator::Eq => false,
+            Operator::NotEq => true,
+            op => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Operator for ANY expression, actual: {:?}",
+                    op
+                )));
+            }
+        };
+
+        match value.data_type() {
+            DataType::Float16 => {
+                make_primitive!(value, as_list, negated, Float16Array, list_from_scalar)
+            }
+            DataType::Float32 => {
+                make_primitive!(value, as_list, negated, Float32Array, list_from_scalar)
+            }
+            DataType::Float64 => {
+                make_primitive!(value, as_list, negated, Float64Array, list_from_scalar)
+            }
+            DataType::Int8 => {
+                make_primitive!(value, as_list, negated, Int8Array, list_from_scalar)
+            }
+            DataType::Int16 => {
+                make_primitive!(value, as_list, negated, Int16Array, list_from_scalar)
+            }
+            DataType::Int32 => {
+                make_primitive!(value, as_list, negated, Int32Array, list_from_scalar)
+            }
+            DataType::Int64 => {
+                make_primitive!(value, as_list, negated, Int64Array, list_from_scalar)
+            }
+            DataType::UInt8 => {
+                make_primitive!(value, as_list, negated, UInt8Array, list_from_scalar)
+            }
+            DataType::UInt16 => {
+                make_primitive!(value, as_list, negated, UInt16Array, list_from_scalar)
+            }
+            DataType::UInt32 => {
+                make_primitive!(value, as_list, negated, UInt32Array, list_from_scalar)
+            }
+            DataType::UInt64 => {
+                make_primitive!(value, as_list, negated, UInt64Array, list_from_scalar)
+            }
+            DataType::Boolean => {
+                self.compare_bool(value, as_list, negated, list_from_scalar)
+            }
+            DataType::Utf8 => {
+                self.compare_utf8::<i32>(value, as_list, negated, list_from_scalar)
+            }
+            DataType::LargeUtf8 => {
+                self.compare_utf8::<i64>(value, as_list, negated, list_from_scalar)
+            }
+            datatype => Result::Err(DataFusionError::NotImplemented(format!(
+                "AnyExpr does not support datatype {:?}.",
+                datatype
+            ))),
+        }
+    }
+}
+
+/// return two physical expressions that are optionally coerced to a
+/// common type that the binary operator supports.
+fn any_cast(
+    value: Arc<dyn PhysicalExpr>,
+    _op: &Operator,
+    list: Arc<dyn PhysicalExpr>,
+    input_schema: &Schema,
+) -> Result<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> {
+    let tmp = list.data_type(input_schema)?;
+    let list_type = match &tmp {
+        DataType::List(f) => f.data_type(),
+        _ => panic!("wtf"),

Review Comment:
   We should return `Err` here, with a slightly more meaningful error message ;-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ovr commented on a diff in pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
ovr commented on code in PR #2549:
URL: https://github.com/apache/arrow-datafusion/pull/2549#discussion_r876413237


##########
datafusion/physical-expr/src/expressions/any.rs:
##########
@@ -0,0 +1,674 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Any expression
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
+    PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::ArrowPrimitiveType;
+use arrow::{
+    datatypes::{DataType, Schema},
+    record_batch::RecordBatch,
+};
+
+use crate::expressions::try_cast;
+use crate::PhysicalExpr;
+use arrow::array::*;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{ColumnarValue, Operator};
+
+macro_rules! compare_op_scalar {
+    ($LEFT: expr, $LIST_VALUES:expr, $OP:expr, $LIST_VALUES_TYPE:ty, $LIST_FROM_SCALAR: expr) => {{
+        let mut builder = BooleanBuilder::new($LEFT.len());
+
+        if $LIST_FROM_SCALAR {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(0) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(0)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        } else {
+            for i in 0..$LEFT.len() {
+                if $LEFT.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    if $LIST_VALUES.is_null(i) {
+                        builder.append_null()?;
+                    } else {
+                        builder.append_value($OP(
+                            $LEFT.value(i),
+                            $LIST_VALUES
+                                .value(i)
+                                .as_any()
+                                .downcast_ref::<$LIST_VALUES_TYPE>()
+                                .unwrap(),
+                        ))?;
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish())
+    }};
+}
+
+macro_rules! make_primitive {
+    ($VALUES:expr, $IN_VALUES:expr, $NEGATED:expr, $TYPE:ident, $LIST_FROM_SCALAR: expr) => {{
+        let left = $VALUES.as_any().downcast_ref::<$TYPE>().expect(&format!(
+            "Unable to downcast values to {}",
+            stringify!($TYPE)
+        ));
+
+        if $NEGATED {
+            Ok(ColumnarValue::Array(Arc::new(neq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_primitive(
+                left,
+                $IN_VALUES,
+                $LIST_FROM_SCALAR,
+            )?)))
+        }
+    }};
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn neq_primitive<T: ArrowPrimitiveType>(
+    array: &PrimitiveArray<T>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &PrimitiveArray<T>| !v.values().contains(&x),
+        PrimitiveArray<T>,
+        list_from_scalar
+    )
+}
+
+fn eq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn neq_bool(
+    array: &BooleanArray,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &BooleanArray| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        BooleanArray,
+        list_from_scalar
+    )
+}
+
+fn eq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return true;
+                }
+            }
+
+            false
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+fn neq_utf8<OffsetSize: OffsetSizeTrait>(
+    array: &GenericStringArray<OffsetSize>,
+    list: &ListArray,
+    list_from_scalar: bool,
+) -> Result<BooleanArray> {
+    compare_op_scalar!(
+        array,
+        list,
+        |x, v: &GenericStringArray<OffsetSize>| unsafe {
+            for i in 0..v.len() {
+                if v.value_unchecked(i) == x {
+                    return false;
+                }
+            }
+
+            true
+        },
+        GenericStringArray<OffsetSize>,
+        list_from_scalar
+    )
+}
+
+/// AnyExpr
+#[derive(Debug)]
+pub struct AnyExpr {
+    value: Arc<dyn PhysicalExpr>,
+    op: Operator,
+    list: Arc<dyn PhysicalExpr>,
+}
+
+impl AnyExpr {
+    /// Create a new Any expression
+    pub fn new(
+        value: Arc<dyn PhysicalExpr>,
+        op: Operator,
+        list: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self { value, op, list }
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_utf8<T: OffsetSizeTrait>(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<T>>()
+            .unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_utf8(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+
+    /// Get the left side of the binary expression
+    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.value
+    }
+
+    /// Get the right side of the binary expression
+    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.list
+    }
+
+    /// Get the operator for this binary expression
+    pub fn op(&self) -> &Operator {
+        &self.op
+    }
+
+    /// Compare for specific utf8 types
+    fn compare_bool(
+        &self,
+        array: ArrayRef,
+        list: &ListArray,
+        negated: bool,
+        list_from_scalar: bool,
+    ) -> Result<ColumnarValue> {
+        let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        if negated {
+            Ok(ColumnarValue::Array(Arc::new(neq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        } else {
+            Ok(ColumnarValue::Array(Arc::new(eq_bool(
+                array,
+                list,
+                list_from_scalar,
+            )?)))
+        }
+    }
+}
+
+impl std::fmt::Display for AnyExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} ANY({})", self.value, self.op, self.list)
+    }
+}
+
+impl PhysicalExpr for AnyExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.value.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = match self.value.evaluate(batch)? {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        };

Review Comment:
   > I haven't reviewed this code super carefully but I wonder what you think about reusing more of the code for InList?
   
   Probably, the nearest operator will be `ALL`, but it's not implemented right now. I tried to reuse the code of `InList`, but come to the decision that it will be easier/correct to combine it with ANY in the future.
   
   > It seems like the implementations of x IN (<expr>, <expr>) and x ANY (<expr>) are almost exactly the same except for the fact that the the x = ANY(<expr>) has the list as a single <expr> ?
   
   Nope, because it supports more operators: `<`, `<=`, `>`, `>=` (it's not implemented in this PR, but anyway).
   
   > I mention this as it is likely the IN / NOT IN are more optimized (use a hashset for testing, for example) as well as already written.
   
   Yes, but IN supports only the vector of scalars in DF. It's not the same as ANY, because it supports List (real column) or scalar.
   
   For example `SELECT left, values FROM table`:
   
   ````
   left | right LIst(Int64) - another logic is required
   1.    | [1,2,3] - ArrayRef = PrimitiveArray<Int64> - another logic is requied
   2.   |. [2,3,4]
   3.   |. [4,5,6]
   4.   |  [4,5,6]
   5.   |. [4,5,6]
   ````
   
   In the case of using the whole column, it is required to use another handling of interacting on values ☝️ . Probably here, will be more correct to go by steps:
   
   - Support more operators
   - Introduce `ALL` operator + extract the same logic with `ANY`.
   
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed pull request #2549: feat: support for AnyExpression

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #2549: feat: support for AnyExpression
URL: https://github.com/apache/arrow-datafusion/pull/2549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org