You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/08 18:00:51 UTC

[arrow-datafusion] branch master updated: feat: support prepare statement (#4490)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4538912db feat: support prepare statement (#4490)
4538912db is described below

commit 4538912db3c913220bfa0b251b6ab62842886145
Author: Nga Tran <ng...@live.com>
AuthorDate: Thu Dec 8 13:00:43 2022 -0500

    feat: support prepare statement (#4490)
    
    * feat: support prepare statement
    
    * fix: typo
    
    * chore: address preliminary review comments
    
    * fix: put Placeholder last to have the expression comparison to work as expected
    
    * test: add more tests and starting to pass param_data_types to expression to get data types of the params , , ...
    
    * test: one more test and a bit of refactor while waiting for the CTEs/PlannerContext PR
    
    * feat: use prepare stmt's param data types in the planner context
    
    * chore: cleanup
    
    * refactor: address review comments
    
    * chore: cleanup
    
    * test: more prepare statement tests
    
    * chore: cleanup
    
    * chore: fix typos and add tests into the sqllogicaltests
    
    * docs: add docstring
    
    * chore: update test panic message due to recent change to have clearer message per review comment
    
    * chore: add a test and a doc string per review comments
    
    * fix: output of a test after master merge
---
 datafusion-cli/Cargo.lock                          |   1 +
 datafusion/core/src/datasource/listing/helpers.rs  |   3 +-
 datafusion/core/src/physical_plan/planner.rs       |  11 +
 .../tests/sqllogictests/test_files/prepare.slt     |  83 ++++++
 datafusion/expr/src/expr.rs                        |  11 +
 datafusion/expr/src/expr_rewriter.rs               |   1 +
 datafusion/expr/src/expr_schema.rs                 |   4 +-
 datafusion/expr/src/expr_visitor.rs                |   3 +-
 datafusion/expr/src/logical_plan/builder.rs        |  17 +-
 datafusion/expr/src/logical_plan/mod.rs            |   2 +-
 datafusion/expr/src/logical_plan/plan.rs           |  36 ++-
 datafusion/expr/src/utils.rs                       |  14 +-
 .../optimizer/src/common_subexpr_eliminate.rs      |   3 +-
 datafusion/optimizer/src/push_down_projection.rs   |   3 +-
 .../src/simplify_expressions/expr_simplifier.rs    |   3 +-
 datafusion/proto/proto/datafusion.proto            |  14 +
 datafusion/proto/src/from_proto.rs                 |  12 +-
 datafusion/proto/src/generated/pbjson.rs           | 261 +++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  24 +-
 datafusion/proto/src/logical_plan.rs               |  38 ++-
 datafusion/proto/src/to_proto.rs                   |   5 +-
 datafusion/sql/Cargo.toml                          |   1 +
 datafusion/sql/src/planner.rs                      | 319 ++++++++++++++++++++-
 datafusion/sql/src/utils.rs                        |   4 +
 24 files changed, 837 insertions(+), 36 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index dbf9cc88d..839255cea 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -800,6 +800,7 @@ dependencies = [
  "arrow-schema",
  "datafusion-common",
  "datafusion-expr",
+ "log",
  "sqlparser",
 ]
 
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 8b20fc5d6..2c014068d 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -121,7 +121,8 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
             | Expr::Sort { .. }
             | Expr::WindowFunction { .. }
             | Expr::Wildcard
-            | Expr::QualifiedWildcard { .. } => {
+            | Expr::QualifiedWildcard { .. }
+            | Expr::Placeholder { .. } => {
                 *self.is_applicable = false;
                 Recursion::Stop(self)
             }
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 596c1888f..bbfa1b6e1 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -344,6 +344,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
         Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
             "Create physical name does not support qualified wildcard".to_string(),
         )),
+        Expr::Placeholder { .. } => Err(DataFusionError::Internal(
+            "Create physical name does not support placeholder".to_string(),
+        )),
     }
 }
 
@@ -1031,6 +1034,14 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: CreateExternalTable".to_string(),
                     ))
                 }
+                LogicalPlan::Prepare(_) => {
+                    // There is no default plan for "PREPARE" -- it must be
+                    // handled at a higher level (so that the appropriate
+                    // statement can be prepared)
+                    Err(DataFusionError::Internal(
+                        "Unsupported logical plan: Prepare".to_string(),
+                    ))
+                }
                 LogicalPlan::CreateCatalogSchema(_) => {
                     // There is no default plan for "CREATE SCHEMA".
                     // It must be handled at a higher level (so
diff --git a/datafusion/core/tests/sqllogictests/test_files/prepare.slt b/datafusion/core/tests/sqllogictests/test_files/prepare.slt
new file mode 100644
index 000000000..948a2e3bc
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/prepare.slt
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## Prepare Statement Tests
+##########
+
+statement ok
+create table person (id int, first_name varchar, last_name varchar, age int, state varchar, salary double, birthday timestamp, "😀" int) as values (1, 'jane', 'smith', 20, 'MA', 100000.45, '2000-11-12T00:00:00'::timestamp, 99);
+
+query C rowsort
+select * from person;
+----
+1 jane smith 20 MA 100000.45 2000-11-12T00:00:00.000000000 99
+
+# Error due to syntax and semantic violation
+
+# Syntax error: no name specified after the keyword prepare
+statement error
+PREPARE AS SELECT id, age  FROM person WHERE age = $foo;
+
+# param following a non-number, $foo, not supported
+statement error
+PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = $foo;
+
+# not specify table hence cannot specify columns
+statement error
+PREPARE my_plan(INT) AS SELECT id + $1;
+
+# not specify data types for all params
+statement error
+PREPARE my_plan(INT) AS SELECT 1 + $1 + $2;
+
+# cannot use IS param
+statement error
+PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age is $1;
+
+# #######################
+# TODO: all the errors below should work ok after we store the prepare logical plan somewhere
+statement error
+PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);
+
+statement error
+PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = 10;
+
+statement error
+PREPARE my_plan AS SELECT id, age  FROM person WHERE age = 10;
+
+statement error
+PREPARE my_plan(INT) AS SELECT $1;
+
+statement error
+PREPARE my_plan(INT) AS SELECT 1 + $1;
+
+statement error
+PREPARE my_plan(INT, DOUBLE) AS SELECT 1 + $1 + $2;
+
+statement error
+PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = $1;
+
+statement error
+PREPARE my_plan(INT, STRING, DOUBLE, INT, DOUBLE, STRING) AS SELECT id, age, $6 FROM person WHERE age IN ($1, $4) AND salary > $3 and salary < $5 OR first_name < $2";
+
+statement error
+PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS SELECT id, SUM(age) FROM person WHERE salary > $2 GROUP BY id HAVING sum(age) < $1 AND SUM(age) > 10 OR SUM(age) in ($3, $4);
+
+statement error
+PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);
+
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 8d0d5c180..fbc98cf01 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -244,6 +244,14 @@ pub enum Expr {
     /// List of grouping set expressions. Only valid in the context of an aggregate
     /// GROUP BY expression list
     GroupingSet(GroupingSet),
+    /// A place holder for parameters in a prepared statement
+    /// (e.g. `$foo` or `$1`)
+    Placeholder {
+        /// The identifier of the parameter (e.g, $1 or $foo)
+        id: String,
+        /// The type the parameter will be filled in with
+        data_type: DataType,
+    },
 }
 
 /// Binary expression
@@ -528,6 +536,7 @@ impl Expr {
             Expr::Literal(..) => "Literal",
             Expr::Negative(..) => "Negative",
             Expr::Not(..) => "Not",
+            Expr::Placeholder { .. } => "Placeholder",
             Expr::QualifiedWildcard { .. } => "QualifiedWildcard",
             Expr::ScalarFunction { .. } => "ScalarFunction",
             Expr::ScalarSubquery { .. } => "ScalarSubquery",
@@ -980,6 +989,7 @@ impl fmt::Debug for Expr {
                     )
                 }
             },
+            Expr::Placeholder { id, .. } => write!(f, "{}", id),
         }
     }
 }
@@ -1263,6 +1273,7 @@ fn create_name(e: &Expr) -> Result<String> {
         Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
             "Create name does not support qualified wildcard".to_string(),
         )),
+        Expr::Placeholder { id, .. } => Ok((*id).to_string()),
     }
 }
 
diff --git a/datafusion/expr/src/expr_rewriter.rs b/datafusion/expr/src/expr_rewriter.rs
index fa7e00d0f..b107d5917 100644
--- a/datafusion/expr/src/expr_rewriter.rs
+++ b/datafusion/expr/src/expr_rewriter.rs
@@ -291,6 +291,7 @@ impl ExprRewritable for Expr {
                     key,
                 ))
             }
+            Expr::Placeholder { id, data_type } => Expr::Placeholder { id, data_type },
         };
 
         // now rewrite this expression itself
diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs
index 8424fa2aa..ae516001b 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -127,6 +127,7 @@ impl ExprSchemable for Expr {
             Expr::Like { .. } | Expr::ILike { .. } | Expr::SimilarTo { .. } => {
                 Ok(DataType::Boolean)
             }
+            Expr::Placeholder { data_type, .. } => Ok(data_type.clone()),
             Expr::Wildcard => Err(DataFusionError::Internal(
                 "Wildcard expressions are not valid in a logical query plan".to_owned(),
             )),
@@ -198,7 +199,8 @@ impl ExprSchemable for Expr {
             | Expr::IsNotTrue(_)
             | Expr::IsNotFalse(_)
             | Expr::IsNotUnknown(_)
-            | Expr::Exists { .. } => Ok(false),
+            | Expr::Exists { .. }
+            | Expr::Placeholder { .. } => Ok(true),
             Expr::InSubquery { expr, .. } => expr.nullable(input_schema),
             Expr::ScalarSubquery(subquery) => {
                 Ok(subquery.subquery.schema().field(0).is_nullable())
diff --git a/datafusion/expr/src/expr_visitor.rs b/datafusion/expr/src/expr_visitor.rs
index bd839f098..b5c6c6802 100644
--- a/datafusion/expr/src/expr_visitor.rs
+++ b/datafusion/expr/src/expr_visitor.rs
@@ -133,7 +133,8 @@ impl ExprVisitable for Expr {
             | Expr::Exists { .. }
             | Expr::ScalarSubquery(_)
             | Expr::Wildcard
-            | Expr::QualifiedWildcard { .. } => Ok(visitor),
+            | Expr::QualifiedWildcard { .. }
+            | Expr::Placeholder { .. } => Ok(visitor),
             Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
                 let visitor = left.accept(visitor)?;
                 right.accept(visitor)
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index caeffa2de..28d3ccc91 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -26,9 +26,9 @@ use crate::{and, binary_expr, Operator};
 use crate::{
     logical_plan::{
         Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
-        JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection,
-        Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values,
-        Window,
+        JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
+        Projection, Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan,
+        Union, Values, Window,
     },
     utils::{
         can_hash, expand_qualified_wildcard, expand_wildcard,
@@ -118,6 +118,8 @@ impl LogicalPlanBuilder {
     /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
     /// The column names are not specified by the SQL standard and different database systems do it differently,
     /// so it's usually better to override the default names with a table alias list.
+    ///
+    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
     pub fn values(mut values: Vec<Vec<Expr>>) -> Result<Self> {
         if values.is_empty() {
             return Err(DataFusionError::Plan("Values list cannot be empty".into()));
@@ -279,6 +281,15 @@ impl LogicalPlanBuilder {
         )?)))
     }
 
+    /// Make a builder for a prepare logical plan from the builder's plan
+    pub fn prepare(&self, name: String, data_types: Vec<DataType>) -> Result<Self> {
+        Ok(Self::from(LogicalPlan::Prepare(Prepare {
+            name,
+            data_types,
+            input: Arc::new(self.plan.clone()),
+        })))
+    }
+
     /// Limit the number of rows returned
     ///
     /// `skip` - Number of rows to skip before fetch any row.
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 2cfe921e6..9d26d2a65 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -25,7 +25,7 @@ pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
     CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
     EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
-    LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition,
+    LogicalPlan, Partitioning, PlanType, PlanVisitor, Prepare, Projection, Repartition,
     SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
     ToStringifiedPlan, Union, Values, Window,
 };
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index e7fa9c39d..7f38e7dbb 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -110,6 +110,8 @@ pub enum LogicalPlan {
     Distinct(Distinct),
     /// Set a Variable
     SetVariable(SetVariable),
+    /// Prepare a statement
+    Prepare(Prepare),
 }
 
 impl LogicalPlan {
@@ -136,6 +138,7 @@ impl LogicalPlan {
             LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
                 schema
             }
+            LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
             LogicalPlan::Explain(explain) => &explain.schema,
             LogicalPlan::Analyze(analyze) => &analyze.schema,
             LogicalPlan::Extension(extension) => extension.node.schema(),
@@ -203,8 +206,9 @@ impl LogicalPlan {
             | LogicalPlan::Sort(Sort { input, .. })
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
             | LogicalPlan::CreateView(CreateView { input, .. })
-            | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
-            LogicalPlan::Distinct(Distinct { input, .. }) => input.all_schemas(),
+            | LogicalPlan::Filter(Filter { input, .. })
+            | LogicalPlan::Distinct(Distinct { input, .. })
+            | LogicalPlan::Prepare(Prepare { input, .. }) => input.all_schemas(),
             LogicalPlan::DropTable(_)
             | LogicalPlan::DropView(_)
             | LogicalPlan::SetVariable(_) => vec![],
@@ -273,7 +277,8 @@ impl LogicalPlan {
             | LogicalPlan::Analyze(_)
             | LogicalPlan::Explain(_)
             | LogicalPlan::Union(_)
-            | LogicalPlan::Distinct(_) => {
+            | LogicalPlan::Distinct(_)
+            | LogicalPlan::Prepare(_) => {
                 vec![]
             }
         }
@@ -302,7 +307,8 @@ impl LogicalPlan {
             LogicalPlan::Explain(explain) => vec![&explain.plan],
             LogicalPlan::Analyze(analyze) => vec![&analyze.input],
             LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
-            | LogicalPlan::CreateView(CreateView { input, .. }) => {
+            | LogicalPlan::CreateView(CreateView { input, .. })
+            | LogicalPlan::Prepare(Prepare { input, .. }) => {
                 vec![input]
             }
             // plans without inputs
@@ -450,9 +456,8 @@ impl LogicalPlan {
                 input.accept(visitor)?
             }
             LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
-            | LogicalPlan::CreateView(CreateView { input, .. }) => {
-                input.accept(visitor)?
-            }
+            | LogicalPlan::CreateView(CreateView { input, .. })
+            | LogicalPlan::Prepare(Prepare { input, .. }) => input.accept(visitor)?,
             LogicalPlan::Extension(extension) => {
                 for input in extension.node.inputs() {
                     if !input.accept(visitor)? {
@@ -963,6 +968,11 @@ impl LogicalPlan {
                     LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
                     LogicalPlan::Union(_) => write!(f, "Union"),
                     LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
+                    LogicalPlan::Prepare(Prepare {
+                        name, data_types, ..
+                    }) => {
+                        write!(f, "Prepare: {:?} {:?} ", name, data_types)
+                    }
                 }
             }
         }
@@ -1373,6 +1383,18 @@ pub struct CreateExternalTable {
     pub options: HashMap<String, String>,
 }
 
+/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
+/// `Expr::Placeholder` expressions that are filled in during execution
+#[derive(Clone)]
+pub struct Prepare {
+    /// The name of the statement
+    pub name: String,
+    /// Data types of the parameters ([`Expr::Placeholder`])
+    pub data_types: Vec<DataType>,
+    /// The logical plan of the statements
+    pub input: Arc<LogicalPlan>,
+}
+
 /// Produces a relation with string representations of
 /// various parts of the plan
 #[derive(Clone)]
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index fcb0365b5..88631cc6f 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -22,8 +22,8 @@ use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
 use crate::logical_plan::builder::build_join_schema;
 use crate::logical_plan::{
     Aggregate, Analyze, CreateMemoryTable, CreateView, Distinct, Extension, Filter, Join,
-    Limit, Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union,
-    Values, Window,
+    Limit, Partitioning, Prepare, Projection, Repartition, Sort, Subquery, SubqueryAlias,
+    Union, Values, Window,
 };
 use crate::{Cast, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
 use arrow::datatypes::{DataType, TimeUnit};
@@ -126,7 +126,8 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
             | Expr::ScalarSubquery(_)
             | Expr::Wildcard
             | Expr::QualifiedWildcard { .. }
-            | Expr::GetIndexedField { .. } => {}
+            | Expr::GetIndexedField { .. }
+            | Expr::Placeholder { .. } => {}
         }
         Ok(Recursion::Continue(self))
     }
@@ -579,6 +580,13 @@ pub fn from_plan(
 
             Ok(plan.clone())
         }
+        LogicalPlan::Prepare(Prepare {
+            name, data_types, ..
+        }) => Ok(LogicalPlan::Prepare(Prepare {
+            name: name.clone(),
+            data_types: data_types.clone(),
+            input: Arc::new(inputs[0].clone()),
+        })),
         LogicalPlan::EmptyRelation(_)
         | LogicalPlan::TableScan { .. }
         | LogicalPlan::CreateExternalTable(_)
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 11ed5cdbe..482298e16 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -240,7 +240,8 @@ impl OptimizerRule for CommonSubexprEliminate {
             | LogicalPlan::DropView(_)
             | LogicalPlan::SetVariable(_)
             | LogicalPlan::Distinct(_)
-            | LogicalPlan::Extension(_) => {
+            | LogicalPlan::Extension(_)
+            | LogicalPlan::Prepare(_) => {
                 // apply the optimization to all inputs of the plan
                 utils::optimize_children(self, plan, optimizer_config)
             }
diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs
index 3cedddc60..2d156d1ce 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -391,7 +391,8 @@ fn optimize_plan(
         | LogicalPlan::SetVariable(_)
         | LogicalPlan::CrossJoin(_)
         | LogicalPlan::Distinct(_)
-        | LogicalPlan::Extension { .. } => {
+        | LogicalPlan::Extension { .. }
+        | LogicalPlan::Prepare(_) => {
             let expr = plan.expressions();
             // collect all required columns by this plan
             exprlist_to_columns(&expr, &mut new_required_columns)?;
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index b32fc53db..3a51099fe 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -253,7 +253,8 @@ impl<'a> ConstEvaluator<'a> {
             | Expr::Sort { .. }
             | Expr::GroupingSet(_)
             | Expr::Wildcard
-            | Expr::QualifiedWildcard { .. } => false,
+            | Expr::QualifiedWildcard { .. }
+            | Expr::Placeholder { .. } => false,
             Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()),
             Expr::ScalarUDF { fun, .. } => Self::volatility_ok(fun.signature.volatility),
             Expr::Literal(_)
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index d5284ef59..97ba57a7e 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -71,6 +71,7 @@ message LogicalPlanNode {
     DistinctNode distinct = 23;
     ViewTableScanNode view_scan = 24;
     CustomTableScanNode custom_scan = 25;
+    PrepareNode prepare = 26;
   }
 }
 
@@ -181,6 +182,12 @@ message CreateExternalTableNode {
   map<string, string> options = 11;
 }
 
+message PrepareNode {
+  string name = 1;
+  repeated ArrowType data_types = 2;
+  LogicalPlanNode input = 3;
+ }
+
 message CreateCatalogSchemaNode {
   string schema_name = 1;
   bool if_not_exists = 2;
@@ -345,9 +352,16 @@ message LogicalExprNode {
     ILikeNode ilike = 32;
     SimilarToNode similar_to = 33;
 
+    PlaceholderNode placeholder = 34;
+
   }
 }
 
+message PlaceholderNode {
+  string id = 1;
+  ArrowType data_type = 2;
+}
+
 message LogicalExprList {
   repeated LogicalExprNode expr = 1;
 }
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 98ffbd240..5d0226958 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -19,7 +19,7 @@ use crate::protobuf::plan_type::PlanTypeEnum::{
     FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
     OptimizedLogicalPlan, OptimizedPhysicalPlan,
 };
-use crate::protobuf::{self};
+use crate::protobuf::{self, PlaceholderNode};
 use crate::protobuf::{
     CubeNode, GroupingSetNode, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
     RollupNode,
@@ -1223,6 +1223,16 @@ pub fn parse_expr(
                     .collect::<Result<Vec<_>, Error>>()?,
             )))
         }
+        ExprType::Placeholder(PlaceholderNode { id, data_type }) => match data_type {
+            None => {
+                let message = format!("Protobuf deserialization error: data type must be provided for the placeholder {}", id);
+                Err(proto_error(message))
+            }
+            Some(data_type) => Ok(Expr::Placeholder {
+                id: id.clone(),
+                data_type: data_type.try_into()?,
+            }),
+        },
     }
 }
 
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 97b796257..13236a935 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -11040,6 +11040,9 @@ impl serde::Serialize for LogicalExprNode {
                 logical_expr_node::ExprType::SimilarTo(v) => {
                     struct_ser.serialize_field("similarTo", v)?;
                 }
+                logical_expr_node::ExprType::Placeholder(v) => {
+                    struct_ser.serialize_field("placeholder", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -11106,6 +11109,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
             "ilike",
             "similar_to",
             "similarTo",
+            "placeholder",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -11143,6 +11147,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
             Like,
             Ilike,
             SimilarTo,
+            Placeholder,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -11197,6 +11202,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
                             "like" => Ok(GeneratedField::Like),
                             "ilike" => Ok(GeneratedField::Ilike),
                             "similarTo" | "similar_to" => Ok(GeneratedField::SimilarTo),
+                            "placeholder" => Ok(GeneratedField::Placeholder),
                             _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
                         }
                     }
@@ -11447,6 +11453,13 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
                                 return Err(serde::de::Error::duplicate_field("similarTo"));
                             }
                             expr_type__ = map.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::SimilarTo)
+;
+                        }
+                        GeneratedField::Placeholder => {
+                            if expr_type__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("placeholder"));
+                            }
+                            expr_type__ = map.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::Placeholder)
 ;
                         }
                     }
@@ -11655,6 +11668,9 @@ impl serde::Serialize for LogicalPlanNode {
                 logical_plan_node::LogicalPlanType::CustomScan(v) => {
                     struct_ser.serialize_field("customScan", v)?;
                 }
+                logical_plan_node::LogicalPlanType::Prepare(v) => {
+                    struct_ser.serialize_field("prepare", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -11701,6 +11717,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
             "viewScan",
             "custom_scan",
             "customScan",
+            "prepare",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -11729,6 +11746,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
             Distinct,
             ViewScan,
             CustomScan,
+            Prepare,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -11774,6 +11792,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
                             "distinct" => Ok(GeneratedField::Distinct),
                             "viewScan" | "view_scan" => Ok(GeneratedField::ViewScan),
                             "customScan" | "custom_scan" => Ok(GeneratedField::CustomScan),
+                            "prepare" => Ok(GeneratedField::Prepare),
                             _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
                         }
                     }
@@ -11962,6 +11981,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
                                 return Err(serde::de::Error::duplicate_field("customScan"));
                             }
                             logical_plan_type__ = map.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::CustomScan)
+;
+                        }
+                        GeneratedField::Prepare => {
+                            if logical_plan_type__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("prepare"));
+                            }
+                            logical_plan_type__ = map.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::Prepare)
 ;
                         }
                     }
@@ -16419,6 +16445,115 @@ impl<'de> serde::Deserialize<'de> for PhysicalWindowExprNode {
         deserializer.deserialize_struct("datafusion.PhysicalWindowExprNode", FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for PlaceholderNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.id.is_empty() {
+            len += 1;
+        }
+        if self.data_type.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.PlaceholderNode", len)?;
+        if !self.id.is_empty() {
+            struct_ser.serialize_field("id", &self.id)?;
+        }
+        if let Some(v) = self.data_type.as_ref() {
+            struct_ser.serialize_field("dataType", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for PlaceholderNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "id",
+            "data_type",
+            "dataType",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Id,
+            DataType,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "id" => Ok(GeneratedField::Id),
+                            "dataType" | "data_type" => Ok(GeneratedField::DataType),
+                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = PlaceholderNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                formatter.write_str("struct datafusion.PlaceholderNode")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> std::result::Result<PlaceholderNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut id__ = None;
+                let mut data_type__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Id => {
+                            if id__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("id"));
+                            }
+                            id__ = Some(map.next_value()?);
+                        }
+                        GeneratedField::DataType => {
+                            if data_type__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("dataType"));
+                            }
+                            data_type__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(PlaceholderNode {
+                    id: id__.unwrap_or_default(),
+                    data_type: data_type__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.PlaceholderNode", FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for PlanType {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
@@ -16580,6 +16715,132 @@ impl<'de> serde::Deserialize<'de> for PlanType {
         deserializer.deserialize_struct("datafusion.PlanType", FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for PrepareNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.name.is_empty() {
+            len += 1;
+        }
+        if !self.data_types.is_empty() {
+            len += 1;
+        }
+        if self.input.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.PrepareNode", len)?;
+        if !self.name.is_empty() {
+            struct_ser.serialize_field("name", &self.name)?;
+        }
+        if !self.data_types.is_empty() {
+            struct_ser.serialize_field("dataTypes", &self.data_types)?;
+        }
+        if let Some(v) = self.input.as_ref() {
+            struct_ser.serialize_field("input", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for PrepareNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "name",
+            "data_types",
+            "dataTypes",
+            "input",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Name,
+            DataTypes,
+            Input,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "name" => Ok(GeneratedField::Name),
+                            "dataTypes" | "data_types" => Ok(GeneratedField::DataTypes),
+                            "input" => Ok(GeneratedField::Input),
+                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = PrepareNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                formatter.write_str("struct datafusion.PrepareNode")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> std::result::Result<PrepareNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut name__ = None;
+                let mut data_types__ = None;
+                let mut input__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Name => {
+                            if name__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("name"));
+                            }
+                            name__ = Some(map.next_value()?);
+                        }
+                        GeneratedField::DataTypes => {
+                            if data_types__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("dataTypes"));
+                            }
+                            data_types__ = Some(map.next_value()?);
+                        }
+                        GeneratedField::Input => {
+                            if input__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("input"));
+                            }
+                            input__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(PrepareNode {
+                    name: name__.unwrap_or_default(),
+                    data_types: data_types__.unwrap_or_default(),
+                    input: input__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.PrepareNode", FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for ProjectionColumns {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 6bfb6b96c..1405e1eba 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -33,7 +33,7 @@ pub struct DfSchema {
 pub struct LogicalPlanNode {
     #[prost(
         oneof = "logical_plan_node::LogicalPlanType",
-        tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25"
+        tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26"
     )]
     pub logical_plan_type: ::core::option::Option<logical_plan_node::LogicalPlanType>,
 }
@@ -89,6 +89,8 @@ pub mod logical_plan_node {
         ViewScan(::prost::alloc::boxed::Box<super::ViewTableScanNode>),
         #[prost(message, tag = "25")]
         CustomScan(super::CustomTableScanNode),
+        #[prost(message, tag = "26")]
+        Prepare(::prost::alloc::boxed::Box<super::PrepareNode>),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
@@ -275,6 +277,15 @@ pub struct CreateExternalTableNode {
     >,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PrepareNode {
+    #[prost(string, tag = "1")]
+    pub name: ::prost::alloc::string::String,
+    #[prost(message, repeated, tag = "2")]
+    pub data_types: ::prost::alloc::vec::Vec<ArrowType>,
+    #[prost(message, optional, boxed, tag = "3")]
+    pub input: ::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CreateCatalogSchemaNode {
     #[prost(string, tag = "1")]
     pub schema_name: ::prost::alloc::string::String,
@@ -406,7 +417,7 @@ pub struct SubqueryAliasNode {
 pub struct LogicalExprNode {
     #[prost(
         oneof = "logical_expr_node::ExprType",
-        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33"
+        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34"
     )]
     pub expr_type: ::core::option::Option<logical_expr_node::ExprType>,
 }
@@ -488,9 +499,18 @@ pub mod logical_expr_node {
         Ilike(::prost::alloc::boxed::Box<super::ILikeNode>),
         #[prost(message, tag = "33")]
         SimilarTo(::prost::alloc::boxed::Box<super::SimilarToNode>),
+        #[prost(message, tag = "34")]
+        Placeholder(super::PlaceholderNode),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PlaceholderNode {
+    #[prost(string, tag = "1")]
+    pub id: ::prost::alloc::string::String,
+    #[prost(message, optional, tag = "2")]
+    pub data_type: ::core::option::Option<ArrowType>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LogicalExprList {
     #[prost(message, repeated, tag = "1")]
     pub expr: ::prost::alloc::vec::Vec<LogicalExprNode>,
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index f0b6d1095..d9423f8de 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -25,7 +25,7 @@ use crate::{
     },
     to_proto,
 };
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
 use datafusion::datasource::TableProvider;
 use datafusion::{
     datasource::{
@@ -39,7 +39,7 @@ use datafusion::{
     prelude::SessionContext,
 };
 use datafusion_common::{context, Column, DataFusionError, OwnedTableReference};
-use datafusion_expr::logical_plan::builder::project;
+use datafusion_expr::logical_plan::{builder::project, Prepare};
 use datafusion_expr::{
     logical_plan::{
         Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
@@ -816,6 +816,18 @@ impl AsLogicalPlan for LogicalPlanNode {
                 )?
                 .build()
             }
+            LogicalPlanType::Prepare(prepare) => {
+                let input: LogicalPlan =
+                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
+                let data_types: Vec<DataType> = prepare
+                    .data_types
+                    .iter()
+                    .map(DataType::try_from)
+                    .collect::<Result<_, _>>()?;
+                LogicalPlanBuilder::from(input)
+                    .prepare(prepare.name.clone(), data_types)?
+                    .build()
+            }
         }
     }
 
@@ -1377,6 +1389,28 @@ impl AsLogicalPlan for LogicalPlanNode {
                     )),
                 })
             }
+            LogicalPlan::Prepare(Prepare {
+                name,
+                data_types,
+                input,
+            }) => {
+                let input = protobuf::LogicalPlanNode::try_from_logical_plan(
+                    input,
+                    extension_codec,
+                )?;
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
+                        protobuf::PrepareNode {
+                            name: name.clone(),
+                            data_types: data_types
+                                .iter()
+                                .map(|t| t.try_into())
+                                .collect::<Result<Vec<_>, _>>()?,
+                            input: Some(Box::new(input)),
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::CreateMemoryTable(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
             )),
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index 133e2f89d..4c280f7b0 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -27,7 +27,7 @@ use crate::protobuf::{
         OptimizedLogicalPlan, OptimizedPhysicalPlan,
     },
     CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList, OptimizedLogicalPlanType,
-    OptimizedPhysicalPlanType, RollupNode,
+    OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
 };
 use arrow::datatypes::{
     DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit,
@@ -888,6 +888,9 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
                         .collect::<Result<Vec<_>, Self::Error>>()?,
                 })),
             },
+            Expr::Placeholder{ id, data_type } => Self {
+                expr_type: Some(ExprType::Placeholder(PlaceholderNode { id: id.clone(), data_type: Some(data_type.try_into()?) })),
+            },
 
             Expr::QualifiedWildcard { .. } | Expr::TryCast { .. } =>
                 return Err(Error::General("Proto serialization error: Expr::QualifiedWildcard { .. } | Expr::TryCast { .. } not supported".to_string())),
diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml
index decec7075..5139bd2b7 100644
--- a/datafusion/sql/Cargo.toml
+++ b/datafusion/sql/Cargo.toml
@@ -40,4 +40,5 @@ unicode_expressions = []
 arrow-schema = "28.0.0"
 datafusion-common = { path = "../common", version = "15.0.0" }
 datafusion-expr = { path = "../expr", version = "15.0.0" }
+log = "^0.4"
 sqlparser = "0.27"
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 8f0129c5c..82d8c3834 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 //! SQL Query Planner (produces logical plan from SQL AST)
-
+use log::debug;
 use std::collections::{HashMap, HashSet};
 use std::str::FromStr;
 use std::sync::Arc;
@@ -46,7 +46,6 @@ use datafusion_expr::expr::{Between, BinaryExpr, Case, Cast, GroupingSet, Like};
 use datafusion_expr::expr_rewriter::normalize_col;
 use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
 use datafusion_expr::logical_plan::builder::project;
-use datafusion_expr::logical_plan::Join as HashJoin;
 use datafusion_expr::logical_plan::JoinConstraint as HashJoinConstraint;
 use datafusion_expr::logical_plan::{
     Analyze, CreateCatalog, CreateCatalogSchema,
@@ -55,6 +54,7 @@ use datafusion_expr::logical_plan::{
     Partitioning, PlanType, SetVariable, ToStringifiedPlan,
 };
 use datafusion_expr::logical_plan::{Filter, Subquery};
+use datafusion_expr::logical_plan::{Join as HashJoin, Prepare};
 use datafusion_expr::utils::{
     can_hash, check_all_column_from_schema, expand_qualified_wildcard, expand_wildcard,
     expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_column_exprs,
@@ -120,13 +120,23 @@ impl Default for PlannerContext {
 }
 
 impl PlannerContext {
-    /// Create a new PlannerContext
+    /// Create an empty PlannerContext
     pub fn new() -> Self {
         Self {
             prepare_param_data_types: vec![],
             ctes: HashMap::new(),
         }
     }
+
+    /// Create a new PlannerContext with provided prepare_param_data_types
+    pub fn new_with_prepare_param_data_types(
+        prepare_param_data_types: Vec<DataType>,
+    ) -> Self {
+        Self {
+            prepare_param_data_types,
+            ctes: HashMap::new(),
+        }
+    }
 }
 
 /// SQL query planner
@@ -197,6 +207,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
     /// Generate a logical plan from an SQL statement
     pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
+        self.sql_statement_to_plan_with_context(statement, &mut PlannerContext::new())
+    }
+
+    /// Generate a logical plan from an SQL statement
+    pub fn sql_statement_to_plan_with_context(
+        &self,
+        statement: Statement,
+        planner_context: &mut PlannerContext,
+    ) -> Result<LogicalPlan> {
         let sql = Some(statement.to_string());
         match statement {
             Statement::Explain {
@@ -207,9 +226,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 describe_alias: _,
                 ..
             } => self.explain_statement_to_plan(verbose, analyze, *statement),
-            Statement::Query(query) => {
-                self.query_to_plan(*query, &mut PlannerContext::new())
-            }
+            Statement::Query(query) => self.query_to_plan(*query, planner_context),
             Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
             Statement::SetVariable {
                 local,
@@ -232,7 +249,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 && table_properties.is_empty()
                 && with_options.is_empty() =>
             {
-                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
+                let plan = self.query_to_plan(*query, planner_context)?;
                 let input_schema = plan.schema();
 
                 let plan = if !columns.is_empty() {
@@ -323,7 +340,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             } => {
                 // We don't support cascade and purge for now.
                 // nor do we support multiple object names
-
                 let name = match names.len() {
                     0 => Err(ParserError("Missing table name.".to_string()).into()),
                     1 => object_name_to_table_reference(names.pop().unwrap()),
@@ -350,6 +366,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     )),
                 }
             }
+            Statement::Prepare {
+                name,
+                data_types,
+                statement,
+            } => {
+                // Convert parser data types to DataFusion data types
+                let data_types: Vec<DataType> = data_types
+                    .into_iter()
+                    .map(|t| self.convert_data_type(&t))
+                    .collect::<Result<_>>()?;
+
+                // Create planner context with parameters
+                let mut planner_context =
+                    PlannerContext::new_with_prepare_param_data_types(data_types.clone());
+
+                // Build logical plan for inner statement of the prepare statement
+                let plan = self.sql_statement_to_plan_with_context(
+                    *statement,
+                    &mut planner_context,
+                )?;
+                Ok(LogicalPlan::Prepare(Prepare {
+                    name: name.to_string(),
+                    data_types,
+                    input: Arc::new(plan),
+                }))
+            }
 
             Statement::ShowTables {
                 extended,
@@ -483,7 +525,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             SetExpr::Select(s) => {
                 self.select_to_plan(*s, planner_context, alias, outer_query_schema)
             }
-            SetExpr::Values(v) => self.sql_values_to_plan(v),
+            SetExpr::Values(v) => {
+                self.sql_values_to_plan(v, &planner_context.prepare_param_data_types)
+            }
             SetExpr::SetOperation {
                 op,
                 left,
@@ -1088,6 +1132,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // process `from` clause
         let plan =
             self.plan_from_tables(select.from, planner_context, outer_query_schema)?;
+
         let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
         // build from schema for unqualifier column ambiguous check
         // we should get only one field for unqualifier column from schema.
@@ -1786,7 +1831,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
     }
 
-    fn sql_values_to_plan(&self, values: SQLValues) -> Result<LogicalPlan> {
+    fn sql_values_to_plan(
+        &self,
+        values: SQLValues,
+        param_data_types: &[DataType],
+    ) -> Result<LogicalPlan> {
         // values should not be based on any other schema
         let schema = DFSchema::empty();
         let values = values
@@ -1803,6 +1852,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             Ok(Expr::Literal(ScalarValue::Null))
                         }
                         SQLExpr::Value(Value::Boolean(n)) => Ok(lit(n)),
+                        SQLExpr::Value(Value::Placeholder(param)) => {
+                            Self::create_placeholder_expr(param, param_data_types)
+                        }
                         SQLExpr::UnaryOp { op, expr } => self.parse_sql_unary_op(
                             op,
                             *expr,
@@ -1842,6 +1894,44 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         LogicalPlanBuilder::values(values)?.build()
     }
 
+    /// Create a placeholder expression
+    /// This is the same as Postgres's prepare statement syntax in which a placeholder starts with `$` sign and then
+    /// number 1, 2, ... etc. For example, `$1` is the first placeholder; $2 is the second one and so on.
+    fn create_placeholder_expr(
+        param: String,
+        param_data_types: &[DataType],
+    ) -> Result<Expr> {
+        // Parse the placeholder as a number because it is the only support from sqlparser and postgres
+        let index = param[1..].parse::<usize>();
+        let idx = match index {
+            Ok(index) => index - 1,
+            Err(_) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Invalid placeholder, not a number: {}",
+                    param
+                )))
+            }
+        };
+        // Check if the placeholder is in the parameter list
+        if param_data_types.len() <= idx {
+            return Err(DataFusionError::Internal(format!(
+                "Placehoder {} does not exist in the parameter list: {:?}",
+                param, param_data_types
+            )));
+        }
+        // Data type of the parameter
+        let param_type = param_data_types[idx].clone();
+        debug!(
+            "type of param {} param_data_types[idx]: {:?}",
+            param, param_type
+        );
+
+        Ok(Expr::Placeholder {
+            id: param,
+            data_type: param_type,
+        })
+    }
+
     fn sql_expr_to_logical_expr(
         &self,
         sql: SQLExpr,
@@ -1853,6 +1943,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             SQLExpr::Value(Value::SingleQuotedString(ref s) | Value::DoubleQuotedString(ref s)) => Ok(lit(s.clone())),
             SQLExpr::Value(Value::Boolean(n)) => Ok(lit(n)),
             SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Null)),
+            SQLExpr::Value(Value::Placeholder(param)) => Self::create_placeholder_expr(param, &planner_context.prepare_param_data_types),
             SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
                 fun: BuiltinScalarFunction::DatePart,
                 args: vec![
@@ -5326,6 +5417,21 @@ mod tests {
         assert_eq!(format!("{:?}", plan), expected);
     }
 
+    fn prepare_stmt_quick_test(
+        sql: &str,
+        expected_plan: &str,
+        expected_data_types: &str,
+    ) {
+        let plan = logical_plan(sql).unwrap();
+        // verify plan
+        assert_eq!(format!("{:?}", plan), expected_plan);
+        // verify data types
+        if let LogicalPlan::Prepare(Prepare { data_types, .. }) = plan {
+            let dt = format!("{:?}", data_types);
+            assert_eq!(dt, expected_data_types);
+        }
+    }
+
     struct MockContextProvider {}
 
     impl ContextProvider for MockContextProvider {
@@ -6125,6 +6231,199 @@ mod tests {
         quick_test(sql, expected);
     }
 
+    #[test]
+    #[should_panic(
+        expected = "value: Internal(\"Invalid placeholder, not a number: $foo\""
+    )]
+    fn test_prepare_statement_to_plan_panic_param_format() {
+        // param is not number following the $ sign
+        // panic due to error returned from the parser
+        let sql = "PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = $foo";
+
+        let expected_plan = "whatever";
+        let expected_dt = "whatever";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    #[should_panic(expected = "value: SQL(ParserError(\"Expected AS, found: SELECT\"))")]
+    fn test_prepare_statement_to_plan_panic_prepare_wrong_syntax() {
+        // param is not number following the $ sign
+        // panic due to error returned from the parser
+        let sql = "PREPARE AS SELECT id, age  FROM person WHERE age = $foo";
+
+        let expected_plan = "whatever";
+        let expected_dt = "whatever";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "value: SchemaError(FieldNotFound { field: Column { relation: None, name: \"id\" }, valid_fields: Some([]) })"
+    )]
+    fn test_prepare_statement_to_plan_panic_no_relation_and_constant_param() {
+        let sql = "PREPARE my_plan(INT) AS SELECT id + $1";
+
+        let expected_plan = "whatever";
+        let expected_dt = "whatever";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "value: Internal(\"Placehoder $2 does not exist in the parameter list: [Int32]\")"
+    )]
+    fn test_prepare_statement_to_plan_panic_no_data_types() {
+        // only provide 1 data type while using 2 params
+        let sql = "PREPARE my_plan(INT) AS SELECT 1 + $1 + $2";
+
+        let expected_plan = "whatever";
+        let expected_dt = "whatever";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "value: SQL(ParserError(\"Expected [NOT] NULL or TRUE|FALSE or [NOT] DISTINCT FROM after IS, found: $1\""
+    )]
+    fn test_prepare_statement_to_plan_panic_is_param() {
+        let sql = "PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age is $1";
+
+        let expected_plan = "whatever";
+        let expected_dt = "whatever";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    fn test_prepare_statement_to_plan_no_param() {
+        // no embedded parameter but still declare it
+        let sql = "PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = 10";
+
+        let expected_plan = "Prepare: \"my_plan\" [Int32] \
+        \n  Projection: person.id, person.age\
+        \n    Filter: person.age = Int64(10)\
+        \n      TableScan: person";
+
+        let expected_dt = "[Int32]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+
+        /////////////////////////
+        // no embedded parameter and no declare it
+        let sql = "PREPARE my_plan AS SELECT id, age  FROM person WHERE age = 10";
+
+        let expected_plan = "Prepare: \"my_plan\" [] \
+        \n  Projection: person.id, person.age\
+        \n    Filter: person.age = Int64(10)\
+        \n      TableScan: person";
+
+        let expected_dt = "[]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    fn test_prepare_statement_to_plan_params_as_constants() {
+        let sql = "PREPARE my_plan(INT) AS SELECT $1";
+
+        let expected_plan = "Prepare: \"my_plan\" [Int32] \
+        \n  Projection: $1\n    EmptyRelation";
+        let expected_dt = "[Int32]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+
+        /////////////////////////
+        let sql = "PREPARE my_plan(INT) AS SELECT 1 + $1";
+
+        let expected_plan = "Prepare: \"my_plan\" [Int32] \
+        \n  Projection: Int64(1) + $1\n    EmptyRelation";
+        let expected_dt = "[Int32]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+
+        /////////////////////////
+        let sql = "PREPARE my_plan(INT, DOUBLE) AS SELECT 1 + $1 + $2";
+
+        let expected_plan = "Prepare: \"my_plan\" [Int32, Float64] \
+        \n  Projection: Int64(1) + $1 + $2\n    EmptyRelation";
+        let expected_dt = "[Int32, Float64]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    fn test_prepare_statement_to_plan_one_param() {
+        let sql = "PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = $1";
+
+        let expected_plan = "Prepare: \"my_plan\" [Int32] \
+        \n  Projection: person.id, person.age\
+        \n    Filter: person.age = $1\
+        \n      TableScan: person";
+
+        let expected_dt = "[Int32]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    fn test_prepare_statement_to_plan_multi_params() {
+        let sql = "PREPARE my_plan(INT, STRING, DOUBLE, INT, DOUBLE, STRING) AS 
+        SELECT id, age, $6  
+        FROM person 
+        WHERE age IN ($1, $4) AND salary > $3 and salary < $5 OR first_name < $2";
+
+        let expected_plan = "Prepare: \"my_plan\" [Int32, Utf8, Float64, Int32, Float64, Utf8] \
+        \n  Projection: person.id, person.age, $6\
+        \n    Filter: person.age IN ([$1, $4]) AND person.salary > $3 AND person.salary < $5 OR person.first_name < $2\
+        \n      TableScan: person";
+
+        let expected_dt = "[Int32, Utf8, Float64, Int32, Float64, Utf8]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    fn test_prepare_statement_to_plan_having() {
+        let sql = "PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS 
+        SELECT id, SUM(age)  
+        FROM person \
+        WHERE salary > $2 
+        GROUP BY id 
+        HAVING sum(age) < $1 AND SUM(age) > 10 OR SUM(age) in ($3, $4)\
+        ";
+
+        let expected_plan = "Prepare: \"my_plan\" [Int32, Float64, Float64, Float64] \
+        \n  Projection: person.id, SUM(person.age)\
+        \n    Filter: SUM(person.age) < $1 AND SUM(person.age) > Int64(10) OR SUM(person.age) IN ([$3, $4])\
+        \n      Aggregate: groupBy=[[person.id]], aggr=[[SUM(person.age)]]\
+        \n        Filter: person.salary > $2\
+        \n          TableScan: person";
+
+        let expected_dt = "[Int32, Float64, Float64, Float64]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
+    #[test]
+    fn test_prepare_statement_to_plan_value_list() {
+        let sql = "PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);";
+
+        let expected_plan = "Prepare: \"my_plan\" [Utf8, Utf8] \
+        \n  Projection: num, letter\
+        \n    Projection: t.column1 AS num, t.column2 AS letter\
+        \n      SubqueryAlias: t\
+        \n        Values: (Int64(1), $1), (Int64(2), $2)";
+
+        let expected_dt = "[Utf8, Utf8]";
+
+        prepare_stmt_quick_test(sql, expected_plan, expected_dt);
+    }
+
     #[test]
     fn test_table_alias() {
         let sql = "select * from (\
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 6a0dd7c3f..4b9ae3ae9 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -411,6 +411,10 @@ where
                     )))
                 }
             },
+            Expr::Placeholder { id, data_type } => Ok(Expr::Placeholder {
+                id: id.clone(),
+                data_type: data_type.clone(),
+            }),
         },
     }
 }