You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/11 05:03:29 UTC

[spark] branch master updated: [SPARK-43884] Param markers in DDL

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 616c492aa1f [SPARK-43884] Param markers in DDL
616c492aa1f is described below

commit 616c492aa1f4915e85b7f153d31ec7cdfe02202a
Author: srielau <se...@rielau.com>
AuthorDate: Sat Jun 10 22:03:14 2023 -0700

    [SPARK-43884] Param markers in DDL
    
    ### What changes were proposed in this pull request?
    
    In this change we allow parameter markers (:parm) to be used in various DDL statements, specifically as input to the IDENTIFIER-clause to allow templating of statements such as CREATE TABLE IDENTIFIER(:mytab).
    We block parameter markers from view bodies to prevent them from appearing in the persisted view definition.
    
    ### Why are the changes needed?
    
    This change is needed to allow exploitation of the IDENTIFIER clause in DDL statements
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes
    
    ### How was this patch tested?
    
    Added new tests to the parameter suite.
    
    Closes #41395 from srielau/SPARK-43884-param-markers-in-ddl.
    
    Lead-authored-by: srielau <se...@rielau.com>
    Co-authored-by: Gengliang Wang <ge...@apache.org>
    Co-authored-by: Serge Rielau <sr...@users.noreply.github.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 core/src/main/resources/error/error-classes.json   |  2 +-
 .../spark/sql/catalyst/analysis/parameters.scala   | 19 +--------
 .../spark/sql/errors/QueryParsingErrors.scala      |  9 ++++
 .../spark/sql/execution/SparkSqlParser.scala       | 28 ++++++++++++-
 .../analyzer-results/identifier-clause.sql.out     | 10 ++---
 .../sql-tests/inputs/identifier-clause.sql         |  2 +-
 .../sql-tests/results/identifier-clause.sql.out    | 10 ++---
 .../org/apache/spark/sql/ParametersSuite.scala     | 49 ++++++++++++++++++++--
 8 files changed, 92 insertions(+), 37 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 8c3c076ce74..7b39ab7266c 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -2335,7 +2335,7 @@
       },
       "PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT" : {
         "message" : [
-          "Parameter markers in unexpected statement: <statement>. Parameter markers must only be used in a query, or DML statement."
+          "Parameter markers are not allowed in <statement>."
         ]
       },
       "PARTITION_WITH_NESTED_COLUMN_IS_UNSUPPORTED" : {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
index 29c36300673..2a31e90465c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, LeafExpression, Literal, SubqueryExpression, Unevaluable}
-import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, InsertIntoStatement, LogicalPlan, MergeIntoTable, UnaryNode, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH}
 import org.apache.spark.sql.errors.QueryErrorsBase
@@ -72,23 +72,6 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase {
       // We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE
       // relations are not children of `UnresolvedWith`.
       case p @ ParameterizedQuery(child, args) if !child.containsPattern(UNRESOLVED_WITH) =>
-        // Some commands may store the original SQL text, like CREATE VIEW, GENERATED COLUMN, etc.
-        // We can't store the original SQL text with parameters, as we don't store the arguments and
-        // are not able to resolve it after parsing it back. Since parameterized query is mostly
-        // used to avoid SQL injection for SELECT queries, we simply forbid non-DML commands here.
-        child match {
-          case _: InsertIntoStatement => // OK
-          case _: UpdateTable => // OK
-          case _: DeleteFromTable => // OK
-          case _: MergeIntoTable => // OK
-          case cmd: Command =>
-            child.failAnalysis(
-              errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
-              messageParameters = Map("statement" -> cmd.nodeName)
-            )
-          case _ => // OK
-        }
-
         args.find(!_._2.isInstanceOf[Literal]).foreach { case (name, expr) =>
           expr.failAnalysis(
             errorClass = "INVALID_SQL_ARG",
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 0eafd2bf278..d2831f27e37 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -530,6 +530,15 @@ private[sql] object QueryParsingErrors extends QueryErrorsBase {
     new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx)
   }
 
+  def parameterMarkerNotAllowed(statement: String, origin: Origin): Throwable = {
+    new ParseException(
+      command = origin.sqlText,
+      start = origin,
+      stop = origin,
+      errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
+      messageParameters = Map("statement" -> statement))
+  }
+
   def defineTempViewWithIfNotExistsError(ctx: CreateViewContext): Throwable = {
     new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0053", ctx)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index c59444f3661..e3ae1b83a16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
 import org.apache.spark.sql.catalyst.util.DateTimeConstants
 import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.execution.command._
@@ -440,6 +441,21 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
 
+  private def checkInvalidParameter(plan: LogicalPlan, statement: String):
+  Unit = {
+    plan.foreach { p =>
+      p.expressions.foreach { expr =>
+        if (expr.containsPattern(PARAMETER)) {
+          throw QueryParsingErrors.parameterMarkerNotAllowed(statement, p.origin)
+        }
+      }
+    }
+    plan.children.foreach(p => checkInvalidParameter(p, statement))
+    plan.innerChildren.collect {
+      case child: LogicalPlan => checkInvalidParameter(child, statement)
+    }
+  }
+
   /**
    * Create or replace a view. This creates a [[CreateViewCommand]].
    *
@@ -488,6 +504,14 @@ class SparkSqlAstBuilder extends AstBuilder {
     } else {
       LocalTempView
     }
+    val qPlan: LogicalPlan = plan(ctx.query)
+
+    // Disallow parameter markers in the body of the view.
+    // We need this limitation because we store the original query text, pre substitution.
+    // To lift this we would need to reconstitute the body with parameter markers replaced with the
+    // values given at CREATE VIEW time, or we would need to store the parameter values alongside
+    // the text.
+    checkInvalidParameter(qPlan, "CREATE VIEW body")
     if (viewType == PersistedView) {
       val originalText = source(ctx.query)
       assert(Option(originalText).isDefined,
@@ -498,7 +522,7 @@ class SparkSqlAstBuilder extends AstBuilder {
         visitCommentSpecList(ctx.commentSpec()),
         properties,
         Some(originalText),
-        plan(ctx.query),
+        qPlan,
         ctx.EXISTS != null,
         ctx.REPLACE != null)
     } else {
@@ -522,7 +546,7 @@ class SparkSqlAstBuilder extends AstBuilder {
           visitCommentSpecList(ctx.commentSpec()),
           properties,
           Option(source(ctx.query)),
-          plan(ctx.query),
+          qPlan,
           ctx.EXISTS != null,
           ctx.REPLACE != null,
           viewType = viewType)
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
index 98a65374619..7c98d4d1670 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
@@ -116,15 +116,13 @@ org.apache.spark.SparkUnsupportedOperationException
 
 -- !query
 MERGE INTO IDENTIFIER('ta' || 'b') AS t USING IDENTIFIER('ta' || 'b') AS s ON s.c1 = t.c1
-  WHEN MATCHED UPDATE SET c1 = 3
+  WHEN MATCHED THEN UPDATE SET c1 = 3
 -- !query analysis
-org.apache.spark.sql.catalyst.parser.ParseException
+org.apache.spark.SparkUnsupportedOperationException
 {
-  "errorClass" : "PARSE_SYNTAX_ERROR",
-  "sqlState" : "42601",
+  "errorClass" : "_LEGACY_ERROR_TEMP_2096",
   "messageParameters" : {
-    "error" : "'UPDATE'",
-    "hint" : ""
+    "ddl" : "MERGE INTO TABLE"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
index 6330c1798b9..93e67411172 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
@@ -24,7 +24,7 @@ INSERT INTO IDENTIFIER('ta' || 'b') VALUES(1);
 DELETE FROM IDENTIFIER('ta' || 'b') WHERE 1=0;
 UPDATE IDENTIFIER('ta' || 'b') SET c1 = 2;
 MERGE INTO IDENTIFIER('ta' || 'b') AS t USING IDENTIFIER('ta' || 'b') AS s ON s.c1 = t.c1
-  WHEN MATCHED UPDATE SET c1 = 3;
+  WHEN MATCHED THEN UPDATE SET c1 = 3;
 SELECT * FROM IDENTIFIER('tab');
 SELECT * FROM IDENTIFIER('s.tab');
 SELECT * FROM IDENTIFIER('`s`.`tab`');
diff --git a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
index 8f44095f677..97220606317 100644
--- a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
@@ -127,17 +127,15 @@ org.apache.spark.SparkUnsupportedOperationException
 
 -- !query
 MERGE INTO IDENTIFIER('ta' || 'b') AS t USING IDENTIFIER('ta' || 'b') AS s ON s.c1 = t.c1
-  WHEN MATCHED UPDATE SET c1 = 3
+  WHEN MATCHED THEN UPDATE SET c1 = 3
 -- !query schema
 struct<>
 -- !query output
-org.apache.spark.sql.catalyst.parser.ParseException
+org.apache.spark.SparkUnsupportedOperationException
 {
-  "errorClass" : "PARSE_SYNTAX_ERROR",
-  "sqlState" : "42601",
+  "errorClass" : "_LEGACY_ERROR_TEMP_2096",
   "messageParameters" : {
-    "error" : "'UPDATE'",
-    "hint" : ""
+    "ddl" : "MERGE INTO TABLE"
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
index 2319761f55d..985d0373c4f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
@@ -122,6 +122,15 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
       Row(1))
   }
 
+  test("parameter in identifier clause in DDL and utility commands") {
+    spark.sql("CREATE VIEW IDENTIFIER(:p1)(c1) AS SELECT 1", args = Map("p1" -> "v"))
+    spark.sql("ALTER VIEW IDENTIFIER(:p1) AS SELECT 2 AS c1", args = Map("p1" -> "v"))
+    checkAnswer(
+      spark.sql("SHOW COLUMNS FROM IDENTIFIER(:p1)", args = Map("p1" -> "v")),
+      Row("c1"))
+    spark.sql("DROP VIEW IDENTIFIER(:p1)", args = Map("p1" -> "v"))
+  }
+
   test("parameters in INSERT") {
     withTable("t") {
       sql("CREATE TABLE t (col INT) USING json")
@@ -130,7 +139,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
     }
   }
 
-  test("parameters not allowed in DDL commands") {
+  test("parameters not allowed in view body ") {
     val sqlText = "CREATE VIEW v AS SELECT :p AS p"
     val args = Map("p" -> 1)
     checkError(
@@ -138,9 +147,43 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
         spark.sql(sqlText, args)
       },
       errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
-      parameters = Map("statement" -> "CreateView"),
+      parameters = Map("statement" -> "CREATE VIEW body"),
+      context = ExpectedContext(
+        fragment = sqlText,
+        start = 0,
+        stop = sqlText.length - 1))
+  }
+
+  test("parameters not allowed in view body - WITH and scalar subquery") {
+    val sqlText = "CREATE VIEW v AS WITH cte(a) AS (SELECT (SELECT :p) AS a)  SELECT a FROM cte"
+    val args = Map("p" -> 1)
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.sql(sqlText, args)
+      },
+      errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
+      parameters = Map("statement" -> "CREATE VIEW body"),
+      context = ExpectedContext(
+        fragment = sqlText,
+        start = 0,
+        stop = sqlText.length - 1))
+  }
+
+  test("parameters not allowed in view body - nested WITH and EXIST") {
+    val sqlText =
+      """CREATE VIEW v AS
+        |SELECT a as a
+        |FROM (WITH cte(a) AS (SELECT CASE WHEN EXISTS(SELECT :p) THEN 1 END AS a)
+        |SELECT a FROM cte)""".stripMargin
+    val args = Map("p" -> 1)
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.sql(sqlText, args)
+      },
+      errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
+      parameters = Map("statement" -> "CREATE VIEW body"),
       context = ExpectedContext(
-        fragment = "CREATE VIEW v AS SELECT :p AS p",
+        fragment = sqlText,
         start = 0,
         stop = sqlText.length - 1))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org