You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/05 19:46:27 UTC

[arrow-datafusion] branch master updated: remove Operator::{Like,NotLike,ILike,NotILike} (#4792)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 702e11740 remove Operator::{Like,NotLike,ILike,NotILike} (#4792)
702e11740 is described below

commit 702e11740f954e689cd6febdea86001c4e0f69cc
Author: unconsolable <ch...@gmail.com>
AuthorDate: Fri Jan 6 03:46:21 2023 +0800

    remove Operator::{Like,NotLike,ILike,NotILike} (#4792)
    
    * remove Operator::{Like,NotLike,ILike,NotILike}
    
    * fix comments
---
 datafusion/core/src/physical_plan/planner.rs       |   8 -
 datafusion/expr/src/expr.rs                        |  11 +-
 datafusion/expr/src/operator.rs                    |  22 +-
 datafusion/expr/src/type_coercion/binary.rs        |  38 +--
 .../optimizer/src/simplify_expressions/utils.rs    |  16 +-
 datafusion/optimizer/src/type_coercion.rs          |  22 +-
 datafusion/physical-expr/src/expressions/binary.rs |  86 +----
 datafusion/physical-expr/src/expressions/like.rs   | 367 +++++++++++++++++++++
 datafusion/physical-expr/src/expressions/mod.rs    |   2 +
 datafusion/physical-expr/src/planner.rs            |  56 +++-
 datafusion/proto/proto/datafusion.proto            |   9 +
 datafusion/proto/src/generated/pbjson.rs           | 157 +++++++++
 datafusion/proto/src/generated/prost.rs            |  15 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   4 -
 datafusion/proto/src/logical_plan/mod.rs           |   4 -
 datafusion/proto/src/physical_plan/from_proto.rs   |  17 +
 datafusion/proto/src/physical_plan/mod.rs          |  22 ++
 datafusion/proto/src/physical_plan/to_proto.rs     |  15 +-
 18 files changed, 684 insertions(+), 187 deletions(-)

diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 217ab2aa4..371b27ed7 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -2019,14 +2019,6 @@ mod tests {
             col("c1").eq(bool_expr.clone()),
             // u32 AND bool
             col("c2").and(bool_expr),
-            // utf8 LIKE u32
-            col("c1").like(col("c2")),
-            // utf8 NOT LIKE u32
-            col("c1").not_like(col("c2")),
-            // utf8 ILIKE u32
-            col("c1").ilike(col("c2")),
-            // utf8 NOT ILIKE u32
-            col("c1").not_ilike(col("c2")),
         ];
         for case in cases {
             let logical_plan = test_csv_scan().await?.project(vec![case.clone()]);
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index fd045b1ac..7af6849ff 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -245,9 +245,6 @@ impl BinaryExpr {
         match self.op {
             Operator::Or => 5,
             Operator::And => 10,
-            Operator::Like | Operator::NotLike | Operator::ILike | Operator::NotILike => {
-                19
-            }
             Operator::NotEq
             | Operator::Eq
             | Operator::Lt
@@ -685,22 +682,22 @@ impl Expr {
 
     /// Return `self LIKE other`
     pub fn like(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Like, other)
+        Expr::Like(Like::new(false, Box::new(self), Box::new(other), None))
     }
 
     /// Return `self NOT LIKE other`
     pub fn not_like(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::NotLike, other)
+        Expr::Like(Like::new(true, Box::new(self), Box::new(other), None))
     }
 
     /// Return `self ILIKE other`
     pub fn ilike(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::ILike, other)
+        Expr::ILike(Like::new(false, Box::new(self), Box::new(other), None))
     }
 
     /// Return `self NOT ILIKE other`
     pub fn not_ilike(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::NotILike, other)
+        Expr::ILike(Like::new(true, Box::new(self), Box::new(other), None))
     }
 
     /// Return `self AS name` alias expression
diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs
index 4c623bb2c..fac81654c 100644
--- a/datafusion/expr/src/operator.rs
+++ b/datafusion/expr/src/operator.rs
@@ -51,14 +51,6 @@ pub enum Operator {
     And,
     /// Logical OR, like `||`
     Or,
-    /// Matches a wildcard pattern
-    Like,
-    /// Does not match a wildcard pattern
-    NotLike,
-    /// Matches a wildcard pattern, ignores case
-    ILike,
-    /// Does not match a wildcard pattern, ignores case
-    NotILike,
     /// IS DISTINCT FROM
     IsDistinctFrom,
     /// IS NOT DISTINCT FROM
@@ -96,10 +88,6 @@ impl Operator {
             Operator::LtEq => Some(Operator::Gt),
             Operator::Gt => Some(Operator::LtEq),
             Operator::GtEq => Some(Operator::Lt),
-            Operator::Like => Some(Operator::NotLike),
-            Operator::NotLike => Some(Operator::Like),
-            Operator::ILike => Some(Operator::NotILike),
-            Operator::NotILike => Some(Operator::ILike),
             Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom),
             Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom),
             Operator::Plus
@@ -133,11 +121,7 @@ impl Operator {
             Operator::LtEq => Some(Operator::GtEq),
             Operator::Gt => Some(Operator::Lt),
             Operator::GtEq => Some(Operator::LtEq),
-            Operator::Like
-            | Operator::NotLike
-            | Operator::ILike
-            | Operator::NotILike
-            | Operator::IsDistinctFrom
+            Operator::IsDistinctFrom
             | Operator::IsNotDistinctFrom
             | Operator::Plus
             | Operator::Minus
@@ -176,10 +160,6 @@ impl fmt::Display for Operator {
             Operator::Modulo => "%",
             Operator::And => "AND",
             Operator::Or => "OR",
-            Operator::Like => "LIKE",
-            Operator::NotLike => "NOT LIKE",
-            Operator::ILike => "ILIKE",
-            Operator::NotILike => "NOT ILIKE",
             Operator::RegexMatch => "~",
             Operator::RegexIMatch => "~*",
             Operator::RegexNotMatch => "!~",
diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs
index 2de1c7a66..d2923c8db 100644
--- a/datafusion/expr/src/type_coercion/binary.rs
+++ b/datafusion/expr/src/type_coercion/binary.rs
@@ -45,10 +45,6 @@ pub fn binary_operator_data_type(
         | Operator::NotEq
         | Operator::And
         | Operator::Or
-        | Operator::Like
-        | Operator::NotLike
-        | Operator::ILike
-        | Operator::NotILike
         | Operator::Lt
         | Operator::Gt
         | Operator::GtEq
@@ -117,10 +113,6 @@ pub fn coerce_types(
         | Operator::Gt
         | Operator::GtEq
         | Operator::LtEq => comparison_coercion(lhs_type, rhs_type),
-        // "like" operators operate on strings and always return a boolean
-        Operator::Like | Operator::NotLike | Operator::ILike | Operator::NotILike => {
-            like_coercion(lhs_type, rhs_type)
-        }
         Operator::Plus | Operator::Minus
             if is_date(lhs_type) || is_timestamp(lhs_type) =>
         {
@@ -525,7 +517,7 @@ fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType>
 
 /// coercion rules for like operations.
 /// This is a union of string coercion rules and dictionary coercion rules
-fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
+pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
     string_coercion(lhs_type, rhs_type)
         .or_else(|| dictionary_coercion(lhs_type, rhs_type, false))
         .or_else(|| null_coercion(lhs_type, rhs_type))
@@ -879,30 +871,10 @@ mod tests {
 
     #[test]
     fn test_type_coercion() -> Result<()> {
-        test_coercion_binary_rule!(
-            DataType::Utf8,
-            DataType::Utf8,
-            Operator::Like,
-            DataType::Utf8
-        );
-        test_coercion_binary_rule!(
-            DataType::Utf8,
-            DataType::Utf8,
-            Operator::NotLike,
-            DataType::Utf8
-        );
-        test_coercion_binary_rule!(
-            DataType::Utf8,
-            DataType::Utf8,
-            Operator::ILike,
-            DataType::Utf8
-        );
-        test_coercion_binary_rule!(
-            DataType::Utf8,
-            DataType::Utf8,
-            Operator::NotILike,
-            DataType::Utf8
-        );
+        // test like coercion rule
+        let result = like_coercion(&DataType::Utf8, &DataType::Utf8);
+        assert_eq!(result, Some(DataType::Utf8));
+
         test_coercion_binary_rule!(
             DataType::Utf8,
             DataType::Date32,
diff --git a/datafusion/optimizer/src/simplify_expressions/utils.rs b/datafusion/optimizer/src/simplify_expressions/utils.rs
index 7bf7b4ecc..10e5c0e87 100644
--- a/datafusion/optimizer/src/simplify_expressions/utils.rs
+++ b/datafusion/optimizer/src/simplify_expressions/utils.rs
@@ -21,7 +21,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::{
     expr::{Between, BinaryExpr},
     expr_fn::{and, concat_ws, or},
-    lit, BuiltinScalarFunction, Expr, Operator,
+    lit, BuiltinScalarFunction, Expr, Like, Operator,
 };
 
 pub static POWS_OF_TEN: [i128; 38] = [
@@ -232,6 +232,20 @@ pub fn negate_clause(expr: Expr) -> Expr {
             between.low,
             between.high,
         )),
+        // not (A like B) ===> A not like B
+        Expr::Like(like) => Expr::Like(Like::new(
+            !like.negated,
+            like.expr,
+            like.pattern,
+            like.escape_char,
+        )),
+        // not (A ilike B) ===> A not ilike B
+        Expr::ILike(like) => Expr::ILike(Like::new(
+            !like.negated,
+            like.expr,
+            like.pattern,
+            like.escape_char,
+        )),
         // use not clause
         _ => Expr::Not(Box::new(expr)),
     }
diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs
index 23d1d3706..e335c9ff5 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -27,7 +27,9 @@ use datafusion_common::{
 use datafusion_expr::expr::{self, Between, BinaryExpr, Case, Like, WindowFunction};
 use datafusion_expr::expr_rewriter::{ExprRewriter, RewriteRecursion};
 use datafusion_expr::logical_plan::Subquery;
-use datafusion_expr::type_coercion::binary::{coerce_types, comparison_coercion};
+use datafusion_expr::type_coercion::binary::{
+    coerce_types, comparison_coercion, like_coercion,
+};
 use datafusion_expr::type_coercion::functions::data_types;
 use datafusion_expr::type_coercion::other::{
     get_coerce_type_for_case_when, get_coerce_type_for_list,
@@ -175,8 +177,11 @@ impl ExprRewriter for TypeCoercionRewriter {
             }) => {
                 let left_type = expr.get_type(&self.schema)?;
                 let right_type = pattern.get_type(&self.schema)?;
-                let coerced_type =
-                    coerce_types(&left_type, &Operator::Like, &right_type)?;
+                let coerced_type = like_coercion(&left_type,  &right_type).ok_or_else(|| {
+                    DataFusionError::Plan(format!(
+                        "There isn't a common type to coerce {left_type} and {right_type} in LIKE expression"
+                    ))
+                })?;
                 let expr = Box::new(expr.cast_to(&coerced_type, &self.schema)?);
                 let pattern = Box::new(pattern.cast_to(&coerced_type, &self.schema)?);
                 let expr = Expr::Like(Like::new(negated, expr, pattern, escape_char));
@@ -190,8 +195,11 @@ impl ExprRewriter for TypeCoercionRewriter {
             }) => {
                 let left_type = expr.get_type(&self.schema)?;
                 let right_type = pattern.get_type(&self.schema)?;
-                let coerced_type =
-                    coerce_types(&left_type, &Operator::ILike, &right_type)?;
+                let coerced_type = like_coercion(&left_type,  &right_type).ok_or_else(|| {
+                    DataFusionError::Plan(format!(
+                        "There isn't a common type to coerce {left_type} and {right_type} in ILIKE expression"
+                    ))
+                })?;
                 let expr = Box::new(expr.cast_to(&coerced_type, &self.schema)?);
                 let pattern = Box::new(pattern.cast_to(&coerced_type, &self.schema)?);
                 let expr = Expr::ILike(Like::new(negated, expr, pattern, escape_char));
@@ -932,7 +940,9 @@ mod test {
         let plan = LogicalPlan::Projection(Projection::try_new(vec![like_expr], empty)?);
         let err = assert_optimized_plan_eq(&plan, expected);
         assert!(err.is_err());
-        assert!(err.unwrap_err().to_string().contains("'Int64 LIKE Utf8' can't be evaluated because there isn't a common type to coerce the types to"));
+        assert!(err.unwrap_err().to_string().contains(
+            "There isn't a common type to coerce Int64 and Utf8 in LIKE expression"
+        ));
         Ok(())
     }
 
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 82e675425..149eee593 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -28,6 +28,8 @@ use arrow::compute::kernels::arithmetic::{
     multiply_scalar, subtract, subtract_scalar,
 };
 use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
+use arrow::compute::kernels::comparison::regexp_is_match_utf8;
+use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar;
 use arrow::compute::kernels::comparison::{
     eq_dyn_binary_scalar, gt_dyn_binary_scalar, gt_eq_dyn_binary_scalar,
     lt_dyn_binary_scalar, lt_eq_dyn_binary_scalar, neq_dyn_binary_scalar,
@@ -47,13 +49,6 @@ use arrow::compute::kernels::comparison::{
 use arrow::compute::kernels::comparison::{
     eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar,
 };
-use arrow::compute::kernels::comparison::{
-    ilike_utf8, like_utf8, nilike_utf8, nlike_utf8, regexp_is_match_utf8,
-};
-use arrow::compute::kernels::comparison::{
-    ilike_utf8_scalar, like_utf8_scalar, nilike_utf8_scalar, nlike_utf8_scalar,
-    regexp_is_match_utf8_scalar,
-};
 
 use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
 use arrow::compute::kernels::concat_elements::concat_elements_utf8;
@@ -344,19 +339,6 @@ macro_rules! compute_op {
     }};
 }
 
-macro_rules! binary_string_array_op_scalar {
-    ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
-        let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
-            DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE),
-            other => Err(DataFusionError::Internal(format!(
-                "Data type {:?} not supported for scalar operation '{}' on string array",
-                other, stringify!($OP)
-            ))),
-        };
-        Some(result)
-    }};
-}
-
 macro_rules! binary_string_array_op {
     ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
         match $LEFT.data_type() {
@@ -941,18 +923,6 @@ impl BinaryExpr {
             Operator::NotEq => {
                 binary_array_op_dyn_scalar!(array, scalar.clone(), neq, bool_type)
             }
-            Operator::Like => {
-                binary_string_array_op_scalar!(array, scalar.clone(), like, bool_type)
-            }
-            Operator::NotLike => {
-                binary_string_array_op_scalar!(array, scalar.clone(), nlike, bool_type)
-            }
-            Operator::ILike => {
-                binary_string_array_op_scalar!(array, scalar.clone(), ilike, bool_type)
-            }
-            Operator::NotILike => {
-                binary_string_array_op_scalar!(array, scalar.clone(), nilike, bool_type)
-            }
             Operator::Plus => {
                 binary_primitive_array_op_scalar!(array, scalar.clone(), add)
             }
@@ -1053,10 +1023,6 @@ impl BinaryExpr {
         right_data_type: &DataType,
     ) -> Result<ArrayRef> {
         match &self.op {
-            Operator::Like => binary_string_array_op!(left, right, like),
-            Operator::NotLike => binary_string_array_op!(left, right, nlike),
-            Operator::ILike => binary_string_array_op!(left, right, ilike),
-            Operator::NotILike => binary_string_array_op!(left, right, nilike),
             Operator::Lt => lt_dyn(&left, &right),
             Operator::LtEq => lt_eq_dyn(&left, &right),
             Operator::Gt => gt_dyn(&left, &right),
@@ -1347,54 +1313,6 @@ mod tests {
             DataType::Float32,
             vec![2f32],
         );
-        test_coercion!(
-            StringArray,
-            DataType::Utf8,
-            vec!["hello world", "world"],
-            StringArray,
-            DataType::Utf8,
-            vec!["%hello%", "%hello%"],
-            Operator::Like,
-            BooleanArray,
-            DataType::Boolean,
-            vec![true, false],
-        );
-        test_coercion!(
-            StringArray,
-            DataType::Utf8,
-            vec!["hello world", "world"],
-            StringArray,
-            DataType::Utf8,
-            vec!["%hello%", "%hello%"],
-            Operator::NotLike,
-            BooleanArray,
-            DataType::Boolean,
-            vec![false, true],
-        );
-        test_coercion!(
-            StringArray,
-            DataType::Utf8,
-            vec!["hEllo world", "world"],
-            StringArray,
-            DataType::Utf8,
-            vec!["%helLo%", "%helLo%"],
-            Operator::ILike,
-            BooleanArray,
-            DataType::Boolean,
-            vec![true, false],
-        );
-        test_coercion!(
-            StringArray,
-            DataType::Utf8,
-            vec!["hEllo world", "world"],
-            StringArray,
-            DataType::Utf8,
-            vec!["%helLo%", "%helLo%"],
-            Operator::NotILike,
-            BooleanArray,
-            DataType::Boolean,
-            vec![false, true],
-        );
         test_coercion!(
             StringArray,
             DataType::Utf8,
diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs
new file mode 100644
index 000000000..bd35b4eca
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/like.rs
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    array::{new_null_array, Array, ArrayRef, StringArray},
+    record_batch::RecordBatch,
+};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+
+use crate::{physical_expr::down_cast_any_ref, AnalysisContext, PhysicalExpr};
+
+use arrow::compute::kernels::comparison::{
+    ilike_utf8, like_utf8, nilike_utf8, nlike_utf8,
+};
+use arrow::compute::kernels::comparison::{
+    ilike_utf8_scalar, like_utf8_scalar, nilike_utf8_scalar, nlike_utf8_scalar,
+};
+
+// Like expression
+#[derive(Debug)]
+pub struct LikeExpr {
+    negated: bool,
+    case_insensitive: bool,
+    expr: Arc<dyn PhysicalExpr>,
+    pattern: Arc<dyn PhysicalExpr>,
+}
+
+impl LikeExpr {
+    pub fn new(
+        negated: bool,
+        case_insensitive: bool,
+        expr: Arc<dyn PhysicalExpr>,
+        pattern: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self {
+            negated,
+            case_insensitive,
+            expr,
+            pattern,
+        }
+    }
+
+    /// Is negated
+    pub fn negated(&self) -> bool {
+        self.negated
+    }
+
+    /// Is case insensitive
+    pub fn case_insensitive(&self) -> bool {
+        self.case_insensitive
+    }
+
+    /// Input expression
+    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.expr
+    }
+
+    /// Pattern expression
+    pub fn pattern(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.pattern
+    }
+
+    /// Operator name
+    fn op_name(&self) -> &str {
+        match (self.negated, self.case_insensitive) {
+            (false, false) => "LIKE",
+            (true, false) => "NOT LIKE",
+            (false, true) => "ILIKE",
+            (true, true) => "NOT ILIKE",
+        }
+    }
+}
+
+impl std::fmt::Display for LikeExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{} {} {}", self.expr, self.op_name(), self.pattern)
+    }
+}
+
+impl PhysicalExpr for LikeExpr {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        Ok(self.expr.nullable(input_schema)? || self.pattern.nullable(input_schema)?)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let expr_value = self.expr.evaluate(batch)?;
+        let pattern_value = self.pattern.evaluate(batch)?;
+        let expr_data_type = expr_value.data_type();
+        let pattern_data_type = pattern_value.data_type();
+
+        match (
+            &expr_value,
+            &expr_data_type,
+            &pattern_value,
+            &pattern_data_type,
+        ) {
+            // Types are equal => valid
+            (_, l, _, r) if l == r => {}
+            // Allow comparing a dictionary value with its corresponding scalar value
+            (
+                ColumnarValue::Array(_),
+                DataType::Dictionary(_, dict_t),
+                ColumnarValue::Scalar(_),
+                scalar_t,
+            )
+            | (
+                ColumnarValue::Scalar(_),
+                scalar_t,
+                ColumnarValue::Array(_),
+                DataType::Dictionary(_, dict_t),
+            ) if dict_t.as_ref() == scalar_t => {}
+            _ => {
+                return Err(DataFusionError::Internal(format!(
+                    "Cannot evaluate {} expression with types {:?} and {:?}",
+                    self.op_name(),
+                    expr_data_type,
+                    pattern_data_type
+                )));
+            }
+        }
+
+        // Attempt to use special kernels if one input is scalar and the other is an array
+        let scalar_result = match (&expr_value, &pattern_value) {
+            (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => {
+                self.evaluate_array_scalar(array, scalar)?
+            }
+            (_, _) => None, // default to array implementation
+        };
+
+        if let Some(result) = scalar_result {
+            return result.map(|a| ColumnarValue::Array(a));
+        }
+
+        // if both arrays or both literals - extract arrays and continue execution
+        let (expr, pattern) = (
+            expr_value.into_array(batch.num_rows()),
+            pattern_value.into_array(batch.num_rows()),
+        );
+        self.evaluate_array_array(expr, pattern)
+            .map(|a| ColumnarValue::Array(a))
+    }
+
+    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone(), self.pattern.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        Ok(Arc::new(LikeExpr::new(
+            self.negated,
+            self.case_insensitive,
+            children[0].clone(),
+            children[1].clone(),
+        )))
+    }
+
+    /// Return the boundaries of this binary expression's result.
+    fn analyze(&self, context: AnalysisContext) -> AnalysisContext {
+        context.with_boundaries(None)
+    }
+}
+
+impl PartialEq<dyn Any> for LikeExpr {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.negated == x.negated
+                    && self.case_insensitive == x.case_insensitive
+                    && self.expr.eq(&x.expr)
+                    && self.pattern.eq(&x.pattern)
+            })
+            .unwrap_or(false)
+    }
+}
+
+macro_rules! binary_string_array_op_scalar {
+    ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
+        let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
+            DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE),
+            other => Err(DataFusionError::Internal(format!(
+                "Data type {:?} not supported for scalar operation '{}' on string array",
+                other, stringify!($OP)
+            ))),
+        };
+        Some(result)
+    }};
+}
+
+impl LikeExpr {
+    /// Evaluate the expression if the input is an array and
+    /// pattern is literal - use scalar operations
+    fn evaluate_array_scalar(
+        &self,
+        array: &dyn Array,
+        scalar: &ScalarValue,
+    ) -> Result<Option<Result<ArrayRef>>> {
+        let scalar_result = match (self.negated, self.case_insensitive) {
+            (false, false) => binary_string_array_op_scalar!(
+                array,
+                scalar.clone(),
+                like,
+                &DataType::Boolean
+            ),
+            (true, false) => binary_string_array_op_scalar!(
+                array,
+                scalar.clone(),
+                nlike,
+                &DataType::Boolean
+            ),
+            (false, true) => binary_string_array_op_scalar!(
+                array,
+                scalar.clone(),
+                ilike,
+                &DataType::Boolean
+            ),
+            (true, true) => binary_string_array_op_scalar!(
+                array,
+                scalar.clone(),
+                nilike,
+                &DataType::Boolean
+            ),
+        };
+        Ok(scalar_result)
+    }
+
+    fn evaluate_array_array(
+        &self,
+        left: Arc<dyn Array>,
+        right: Arc<dyn Array>,
+    ) -> Result<ArrayRef> {
+        match (self.negated, self.case_insensitive) {
+            (false, false) => binary_string_array_op!(left, right, like),
+            (true, false) => binary_string_array_op!(left, right, nlike),
+            (false, true) => binary_string_array_op!(left, right, ilike),
+            (true, true) => binary_string_array_op!(left, right, nilike),
+        }
+    }
+}
+
+/// Create a like expression, erroring if the argument types are not compatible.
+pub fn like(
+    negated: bool,
+    case_insensitive: bool,
+    expr: Arc<dyn PhysicalExpr>,
+    pattern: Arc<dyn PhysicalExpr>,
+    input_schema: &Schema,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    let expr_type = &expr.data_type(input_schema)?;
+    let pattern_type = &pattern.data_type(input_schema)?;
+    if !expr_type.eq(pattern_type) {
+        return Err(DataFusionError::Internal(format!(
+            "The type of {expr_type} AND {pattern_type} of like physical should be same"
+        )));
+    }
+    Ok(Arc::new(LikeExpr::new(
+        negated,
+        case_insensitive,
+        expr,
+        pattern,
+    )))
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::expressions::col;
+    use arrow::array::BooleanArray;
+    use arrow_schema::Field;
+    use datafusion_common::cast::as_boolean_array;
+
+    macro_rules! test_like {
+        ($A_VEC:expr, $B_VEC:expr, $VEC:expr, $NULLABLE: expr, $NEGATED:expr, $CASE_INSENSITIVE:expr,) => {{
+            let schema = Schema::new(vec![
+                Field::new("a", DataType::Utf8, $NULLABLE),
+                Field::new("b", DataType::Utf8, $NULLABLE),
+            ]);
+            let a = StringArray::from($A_VEC);
+            let b = StringArray::from($B_VEC);
+
+            let expression = like(
+                $NEGATED,
+                $CASE_INSENSITIVE,
+                col("a", &schema)?,
+                col("b", &schema)?,
+                &schema,
+            )?;
+            let batch = RecordBatch::try_new(
+                Arc::new(schema.clone()),
+                vec![Arc::new(a), Arc::new(b)],
+            )?;
+
+            // compute
+            let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+            let result =
+                as_boolean_array(&result).expect("failed to downcast to BooleanArray");
+            let expected = &BooleanArray::from($VEC);
+            assert_eq!(expected, result);
+        }};
+    }
+
+    #[test]
+    fn like_op() -> Result<()> {
+        test_like!(
+            vec!["hello world", "world"],
+            vec!["%hello%", "%hello%"],
+            vec![true, false],
+            false,
+            false,
+            false,
+        ); // like
+        test_like!(
+            vec![Some("hello world"), None, Some("world")],
+            vec![Some("%hello%"), None, Some("%hello%")],
+            vec![Some(false), None, Some(true)],
+            true,
+            true,
+            false,
+        ); // not like
+        test_like!(
+            vec!["hello world", "world"],
+            vec!["%helLo%", "%helLo%"],
+            vec![true, false],
+            false,
+            false,
+            true,
+        ); // ilike
+        test_like!(
+            vec![Some("hello world"), None, Some("world")],
+            vec![Some("%helLo%"), None, Some("%helLo%")],
+            vec![Some(false), None, Some(true)],
+            true,
+            true,
+            true,
+        ); // not ilike
+
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index 8294c0ef7..411482cc4 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -27,6 +27,7 @@ mod get_indexed_field;
 mod in_list;
 mod is_not_null;
 mod is_null;
+mod like;
 mod literal;
 mod negative;
 mod no_op;
@@ -82,6 +83,7 @@ pub use get_indexed_field::GetIndexedFieldExpr;
 pub use in_list::{in_list, InListExpr};
 pub use is_not_null::{is_not_null, IsNotNullExpr};
 pub use is_null::{is_null, IsNullExpr};
+pub use like::{like, LikeExpr};
 pub use literal::{lit, Literal};
 pub use negative::{negative, NegativeExpr};
 pub use no_op::NoOp;
diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs
index 2be6c1f33..31d28f820 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -19,7 +19,7 @@ use crate::var_provider::is_system_variables;
 use crate::{
     execution_props::ExecutionProps,
     expressions::{
-        self, binary, Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal,
+        self, binary, like, Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal,
     },
     functions, udf,
     var_provider::VarType,
@@ -220,14 +220,25 @@ pub fn create_physical_expr(
                     "LIKE does not support escape_char".to_string(),
                 ));
             }
-            let op = if *negated {
-                Operator::NotLike
-            } else {
-                Operator::Like
-            };
-            let bin_expr =
-                binary_expr(expr.as_ref().clone(), op, pattern.as_ref().clone());
-            create_physical_expr(&bin_expr, input_dfschema, input_schema, execution_props)
+            let physical_expr = create_physical_expr(
+                expr,
+                input_dfschema,
+                input_schema,
+                execution_props,
+            )?;
+            let physical_pattern = create_physical_expr(
+                pattern,
+                input_dfschema,
+                input_schema,
+                execution_props,
+            )?;
+            like(
+                *negated,
+                false,
+                physical_expr,
+                physical_pattern,
+                input_schema,
+            )
         }
         Expr::ILike(Like {
             negated,
@@ -240,14 +251,25 @@ pub fn create_physical_expr(
                     "ILIKE does not support escape_char".to_string(),
                 ));
             }
-            let op = if *negated {
-                Operator::NotILike
-            } else {
-                Operator::ILike
-            };
-            let bin_expr =
-                binary_expr(expr.as_ref().clone(), op, pattern.as_ref().clone());
-            create_physical_expr(&bin_expr, input_dfschema, input_schema, execution_props)
+            let physical_expr = create_physical_expr(
+                expr,
+                input_dfschema,
+                input_schema,
+                execution_props,
+            )?;
+            let physical_pattern = create_physical_expr(
+                pattern,
+                input_dfschema,
+                input_schema,
+                execution_props,
+            )?;
+            like(
+                *negated,
+                true,
+                physical_expr,
+                physical_pattern,
+                input_schema,
+            )
         }
         Expr::Case(case) => {
             let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 7eee86465..8a9a8c0f6 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1021,6 +1021,8 @@ message PhysicalExprNode {
     PhysicalScalarUdfNode scalar_udf = 16;
 
     PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
+
+    PhysicalLikeExprNode like_expr = 18;
   }
 }
 
@@ -1074,6 +1076,13 @@ message PhysicalDateTimeIntervalExprNode {
   string op = 3;
 }
 
+message PhysicalLikeExprNode {
+  bool negated = 1;
+  bool case_insensitive = 2;
+  PhysicalExprNode expr = 3;
+  PhysicalExprNode pattern = 4;
+}
+
 message PhysicalSortExprNode {
   PhysicalExprNode expr = 1;
   bool asc = 2;
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 4913761e1..65d7af706 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -13206,6 +13206,9 @@ impl serde::Serialize for PhysicalExprNode {
                 physical_expr_node::ExprType::DateTimeIntervalExpr(v) => {
                     struct_ser.serialize_field("dateTimeIntervalExpr", v)?;
                 }
+                physical_expr_node::ExprType::LikeExpr(v) => {
+                    struct_ser.serialize_field("likeExpr", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -13247,6 +13250,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
             "scalarUdf",
             "date_time_interval_expr",
             "dateTimeIntervalExpr",
+            "like_expr",
+            "likeExpr",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -13268,6 +13273,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
             WindowExpr,
             ScalarUdf,
             DateTimeIntervalExpr,
+            LikeExpr,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -13306,6 +13312,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
                             "windowExpr" | "window_expr" => Ok(GeneratedField::WindowExpr),
                             "scalarUdf" | "scalar_udf" => Ok(GeneratedField::ScalarUdf),
                             "dateTimeIntervalExpr" | "date_time_interval_expr" => Ok(GeneratedField::DateTimeIntervalExpr),
+                            "likeExpr" | "like_expr" => Ok(GeneratedField::LikeExpr),
                             _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
                         }
                     }
@@ -13445,6 +13452,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
                                 return Err(serde::de::Error::duplicate_field("dateTimeIntervalExpr"));
                             }
                             expr_type__ = map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DateTimeIntervalExpr)
+;
+                        }
+                        GeneratedField::LikeExpr => {
+                            if expr_type__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("likeExpr"));
+                            }
+                            expr_type__ = map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::LikeExpr)
 ;
                         }
                     }
@@ -13986,6 +14000,149 @@ impl<'de> serde::Deserialize<'de> for PhysicalIsNull {
         deserializer.deserialize_struct("datafusion.PhysicalIsNull", FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for PhysicalLikeExprNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.negated {
+            len += 1;
+        }
+        if self.case_insensitive {
+            len += 1;
+        }
+        if self.expr.is_some() {
+            len += 1;
+        }
+        if self.pattern.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalLikeExprNode", len)?;
+        if self.negated {
+            struct_ser.serialize_field("negated", &self.negated)?;
+        }
+        if self.case_insensitive {
+            struct_ser.serialize_field("caseInsensitive", &self.case_insensitive)?;
+        }
+        if let Some(v) = self.expr.as_ref() {
+            struct_ser.serialize_field("expr", v)?;
+        }
+        if let Some(v) = self.pattern.as_ref() {
+            struct_ser.serialize_field("pattern", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for PhysicalLikeExprNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "negated",
+            "case_insensitive",
+            "caseInsensitive",
+            "expr",
+            "pattern",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Negated,
+            CaseInsensitive,
+            Expr,
+            Pattern,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "negated" => Ok(GeneratedField::Negated),
+                            "caseInsensitive" | "case_insensitive" => Ok(GeneratedField::CaseInsensitive),
+                            "expr" => Ok(GeneratedField::Expr),
+                            "pattern" => Ok(GeneratedField::Pattern),
+                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = PhysicalLikeExprNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                formatter.write_str("struct datafusion.PhysicalLikeExprNode")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> std::result::Result<PhysicalLikeExprNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut negated__ = None;
+                let mut case_insensitive__ = None;
+                let mut expr__ = None;
+                let mut pattern__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Negated => {
+                            if negated__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("negated"));
+                            }
+                            negated__ = Some(map.next_value()?);
+                        }
+                        GeneratedField::CaseInsensitive => {
+                            if case_insensitive__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("caseInsensitive"));
+                            }
+                            case_insensitive__ = Some(map.next_value()?);
+                        }
+                        GeneratedField::Expr => {
+                            if expr__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("expr"));
+                            }
+                            expr__ = map.next_value()?;
+                        }
+                        GeneratedField::Pattern => {
+                            if pattern__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("pattern"));
+                            }
+                            pattern__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(PhysicalLikeExprNode {
+                    negated: negated__.unwrap_or_default(),
+                    case_insensitive: case_insensitive__.unwrap_or_default(),
+                    expr: expr__,
+                    pattern: pattern__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.PhysicalLikeExprNode", FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for PhysicalNegativeNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index a67787559..a20b15fcc 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1274,7 +1274,7 @@ pub struct PhysicalExtensionNode {
 pub struct PhysicalExprNode {
     #[prost(
         oneof = "physical_expr_node::ExprType",
-        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17"
+        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18"
     )]
     pub expr_type: ::core::option::Option<physical_expr_node::ExprType>,
 }
@@ -1323,6 +1323,8 @@ pub mod physical_expr_node {
         DateTimeIntervalExpr(
             ::prost::alloc::boxed::Box<super::PhysicalDateTimeIntervalExprNode>,
         ),
+        #[prost(message, tag = "18")]
+        LikeExpr(::prost::alloc::boxed::Box<super::PhysicalLikeExprNode>),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
@@ -1404,6 +1406,17 @@ pub struct PhysicalDateTimeIntervalExprNode {
     pub op: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalLikeExprNode {
+    #[prost(bool, tag = "1")]
+    pub negated: bool,
+    #[prost(bool, tag = "2")]
+    pub case_insensitive: bool,
+    #[prost(message, optional, boxed, tag = "3")]
+    pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+    #[prost(message, optional, boxed, tag = "4")]
+    pub pattern: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalSortExprNode {
     #[prost(message, optional, boxed, tag = "1")]
     pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs
index b17a9c4c1..f9ded9237 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -1370,10 +1370,6 @@ pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
         "Multiply" => Ok(Operator::Multiply),
         "Divide" => Ok(Operator::Divide),
         "Modulo" => Ok(Operator::Modulo),
-        "Like" => Ok(Operator::Like),
-        "NotLike" => Ok(Operator::NotLike),
-        "ILike" => Ok(Operator::ILike),
-        "NotILike" => Ok(Operator::NotILike),
         "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
         "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
         "BitwiseAnd" => Ok(Operator::BitwiseAnd),
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index ff94d4670..78319f497 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -2299,10 +2299,6 @@ mod roundtrip_tests {
         test(Operator::RegexNotMatch);
         test(Operator::RegexIMatch);
         test(Operator::RegexMatch);
-        test(Operator::Like);
-        test(Operator::NotLike);
-        test(Operator::ILike);
-        test(Operator::NotILike);
         test(Operator::BitwiseShiftRight);
         test(Operator::BitwiseShiftLeft);
         test(Operator::BitwiseAnd);
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index ffc010465..f1f7f8844 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -29,6 +29,7 @@ use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::window_function::WindowFunction;
 use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
+use datafusion::physical_plan::expressions::LikeExpr;
 use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{
     expressions::{
@@ -217,6 +218,22 @@ pub(crate) fn parse_physical_expr(
                 &convert_required!(e.return_type)?,
             ))
         }
+        ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
+            like_expr.negated,
+            like_expr.case_insensitive,
+            parse_required_physical_box_expr(
+                &like_expr.expr,
+                registry,
+                "expr",
+                input_schema,
+            )?,
+            parse_required_physical_box_expr(
+                &like_expr.pattern,
+                registry,
+                "pattern",
+                input_schema,
+            )?,
+        )),
     };
 
     Ok(pexpr)
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index cff23974e..4c0f70149 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1212,6 +1212,7 @@ mod roundtrip_tests {
     use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
     use datafusion::physical_expr::ScalarFunctionExpr;
     use datafusion::physical_plan::aggregates::PhysicalGroupBy;
+    use datafusion::physical_plan::expressions::like;
     use datafusion::physical_plan::functions;
     use datafusion::physical_plan::functions::make_scalar_function;
     use datafusion::physical_plan::projection::ProjectionExec;
@@ -1563,4 +1564,25 @@ mod roundtrip_tests {
             schema,
         )?))
     }
+
+    #[test]
+    fn roundtrip_like() -> Result<()> {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+        let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+        let like_expr = like(
+            false,
+            false,
+            col("a", &schema)?,
+            col("b", &schema)?,
+            &schema,
+        )?;
+        let plan = Arc::new(ProjectionExec::try_new(
+            vec![(like_expr, "result".to_string())],
+            input,
+        )?);
+        roundtrip_test(plan)
+    }
 }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index c5defbf77..653baec99 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -36,7 +36,9 @@ use datafusion::physical_plan::file_format::FileScanConfig;
 
 use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal};
 
-use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max, Min, Sum};
+use datafusion::physical_plan::expressions::{
+    Avg, BinaryExpr, Column, LikeExpr, Max, Min, Sum,
+};
 use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 
 use crate::protobuf;
@@ -330,6 +332,17 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
                     ),
                 ),
             })
+        } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
+            Ok(protobuf::PhysicalExprNode {
+                expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(
+                    Box::new(protobuf::PhysicalLikeExprNode {
+                        negated: expr.negated(),
+                        case_insensitive: expr.case_insensitive(),
+                        expr: Some(Box::new(expr.expr().to_owned().try_into()?)),
+                        pattern: Some(Box::new(expr.pattern().to_owned().try_into()?)),
+                    }),
+                )),
+            })
         } else {
             Err(DataFusionError::Internal(format!(
                 "physical_plan::to_proto() unsupported expression {value:?}"