You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/07/14 22:57:58 UTC

[GitHub] [spark] dtenedor commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

dtenedor commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1264223983


##########
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?      #createVariable

Review Comment:
   ```suggestion
           multipartIdentifier dataType? variableDefaultExpression?       #createVariable
   ```



##########
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -210,6 +213,9 @@ statement
     | SET TIME ZONE interval                                           #setTimeZone
     | SET TIME ZONE timezone                                           #setTimeZone
     | SET TIME ZONE .*?                                                #setTimeZone
+    | SET (VARIABLE | VAR) assignmentList                             #setVariable

Review Comment:
   ```suggestion
       | SET (VARIABLE | VAR) assignmentList                              #setVariable
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -286,13 +288,13 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
       ResolveFieldNameAndPosition ::
       AddMetadataColumns ::
       DeduplicateRelations ::
-      ResolveReferences ::
+      new ResolveReferences(v1SessionCatalog) ::

Review Comment:
   optional: you can make these `case class`es instead, to avoid the `new` keyword here and below



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {

Review Comment:
   once code appears in `Analyzer.scala`, we generally don't ever move it out (to avoid messing up the GitHub blame history). However `Analyzer.scala` is already huge with many thousands of lines; could we move this new rule into a separate file instead if we can?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {

Review Comment:
   please fix ident (-2 spaces for each line)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {

Review Comment:
   please fix indentation?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)

Review Comment:
   add a brief comment saying what this is?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+* A place holder used to hold a reference that has been resolved to a field outside of the current
+* plan. This is used for correlated subqueries.

Review Comment:
   ```suggestion
    * plan. This is used for correlated subqueries.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -201,6 +201,109 @@ object UnresolvedTVFAliases {
   }
 }
 
+/**
+ * Holds the name of a variable (on the left hand side of a SET) that has yet to be resolved.
+ */
+case class UnresolvedVariable(nameParts: Seq[String]) extends Attribute with Unevaluable {
+
+  def name: String =
+    nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
+
+  override def exprId: ExprId = throw new UnresolvedException("exprId")
+  override def dataType: DataType = throw new UnresolvedException("dataType")
+  override def nullable: Boolean = throw new UnresolvedException("nullable")
+  override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
+  override lazy val resolved = false
+
+  override def newInstance(): UnresolvedVariable = this
+  override def withNullability(newNullability: Boolean): UnresolvedVariable = this
+  override def withQualifier(newQualifier: Seq[String]): UnresolvedVariable = this
+  override def withName(newName: String): UnresolvedVariable = UnresolvedVariable.quoted(newName)
+  override def withMetadata(newMetadata: Metadata): Attribute = this
+  override def withExprId(newExprId: ExprId): UnresolvedVariable = this
+  override def withDataType(newType: DataType): Attribute = this
+  final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_ATTRIBUTE)
+
+  override def toString: String = s"'$name"
+
+  override def sql: String = nameParts.map(quoteIfNeeded(_)).mkString(".")
+
+  /**
+   * Returns true if this matches the token. This requires the variable to only have one part in
+   * its name and that matches the given token in a case insensitive way.
+   */
+  def equalsIgnoreCase(token: String): Boolean = {
+    nameParts.length == 1 && nameParts.head.equalsIgnoreCase(token)
+  }
+}
+
+object UnresolvedVariable {
+  /**
+   * Creates an [[UnresolvedVariable]], parsing segments separated by dots ('.').
+   */
+  def apply(name: String): UnresolvedVariable =
+    new UnresolvedVariable(CatalystSqlParser.parseMultipartIdentifier(name))
+
+  /**
+   * Creates an [[UnresolvedVariable]], from a single quoted string (for example using backticks in
+   * HiveQL.  Since the string is consider quoted, no processing is done on the name.

Review Comment:
   ```suggestion
      * HiveQL). Since the string is considered quoted, no processing is done on the name.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -527,7 +530,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
-  object ResolveGroupingAnalytics extends Rule[LogicalPlan] {
+  class ResolveGroupingAnalytics(catalog: SessionCatalog) extends Rule[LogicalPlan] {

Review Comment:
   it doesn't appear that this rule uses this newly provided `catalog` anywhere?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {
+                throw  unresolvedVariableError(varIdent, Seq("SESSION"))
+              }
+              VariableReference(varIdent.variableName, varInfo.get._1, canFold = false)
+            case other => other
+          }
+        }
+
+        /**
+         * Protect against duplicate variable names
+         */
+        val varNames = resolvedVars.collect { case variable => variable.toString }
+        val dups = varNames.diff(varNames.distinct).distinct

Review Comment:
   do we need to consider case sensitivity here? Leave a brief comment?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {

Review Comment:
   you can use
   
   ```
   sessionCatalog.getVariable(varIdent).map { varInfo =>
     VariableReference(varIdent.variableName, varInfo.get._1, canFold = false)
   }.getOrElse {
     throw unresolvedVariableError(varIdent, Seq("SESSION"))
   }



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3429,7 +3491,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
    * Replaces [[UnresolvedDeserializer]] with the deserialization expression that has been resolved
    * to the given input attributes.
    */
-  object ResolveDeserializer extends Rule[LogicalPlan] {
+  class ResolveDeserializer(catalog: SessionCatalog) extends Rule[LogicalPlan] {

Review Comment:
   this `catalog` doesn't appear to get used; can we remove it?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {
+                throw  unresolvedVariableError(varIdent, Seq("SESSION"))

Review Comment:
   ```suggestion
                   throw unresolvedVariableError(varIdent, Seq("SESSION"))
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)

Review Comment:
   you can use
   
   ```
   catalog.getVariable(varIdentifier).map { varInfo =>
     VarInfoObj(varIdentifier, varInfo._2)
   }.getOrElse {
     throw new NoSuchVariableException(varIdentifier.nameParts) 
   }



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)
+      if (varInfo.isEmpty) {
+        throw new NoSuchVariableException(varIdentifier.nameParts)
+      }
+      VarInfoObj(varIdentifier, varInfo.get._2)
+    }
+    val array = query.executeCollect()
+    if (array.length == 0) {
+      varInfoList foreach(info => catalog.createTempVariable(info.identifier.variableName,
+        Literal(null, info.fieldInfo.dataType),
+        info.fieldInfo.getCurrentDefaultValue().get
+        , overrideIfExists = true))

Review Comment:
   move comma to the end of the previous line?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)
+      if (varInfo.isEmpty) {
+        throw new NoSuchVariableException(varIdentifier.nameParts)
+      }
+      VarInfoObj(varIdentifier, varInfo.get._2)
+    }
+    val array = query.executeCollect()
+    if (array.length == 0) {
+      varInfoList foreach(info => catalog.createTempVariable(info.identifier.variableName,
+        Literal(null, info.fieldInfo.dataType),
+        info.fieldInfo.getCurrentDefaultValue().get

Review Comment:
   how do we know this `.get` will succeed?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)
+      if (varInfo.isEmpty) {
+        throw new NoSuchVariableException(varIdentifier.nameParts)
+      }
+      VarInfoObj(varIdentifier, varInfo.get._2)
+    }
+    val array = query.executeCollect()
+    if (array.length == 0) {
+      varInfoList foreach(info => catalog.createTempVariable(info.identifier.variableName,
+        Literal(null, info.fieldInfo.dataType),
+        info.fieldInfo.getCurrentDefaultValue().get
+        , overrideIfExists = true))
+    } else if (array.length > 1) {
+      throw QueryExecutionErrors.multipleRowSubqueryError()
+    } else {

Review Comment:
   you can drop the `else` since we throw an exception above?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -201,6 +201,109 @@ object UnresolvedTVFAliases {
   }
 }
 
+/**
+ * Holds the name of a variable (on the left hand side of a SET) that has yet to be resolved.
+ */
+case class UnresolvedVariable(nameParts: Seq[String]) extends Attribute with Unevaluable {
+
+  def name: String =
+    nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
+
+  override def exprId: ExprId = throw new UnresolvedException("exprId")
+  override def dataType: DataType = throw new UnresolvedException("dataType")
+  override def nullable: Boolean = throw new UnresolvedException("nullable")
+  override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
+  override lazy val resolved = false
+
+  override def newInstance(): UnresolvedVariable = this
+  override def withNullability(newNullability: Boolean): UnresolvedVariable = this
+  override def withQualifier(newQualifier: Seq[String]): UnresolvedVariable = this
+  override def withName(newName: String): UnresolvedVariable = UnresolvedVariable.quoted(newName)
+  override def withMetadata(newMetadata: Metadata): Attribute = this
+  override def withExprId(newExprId: ExprId): UnresolvedVariable = this
+  override def withDataType(newType: DataType): Attribute = this
+  final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_ATTRIBUTE)
+
+  override def toString: String = s"'$name"
+
+  override def sql: String = nameParts.map(quoteIfNeeded(_)).mkString(".")
+
+  /**
+   * Returns true if this matches the token. This requires the variable to only have one part in
+   * its name and that matches the given token in a case insensitive way.
+   */
+  def equalsIgnoreCase(token: String): Boolean = {
+    nameParts.length == 1 && nameParts.head.equalsIgnoreCase(token)
+  }
+}
+
+object UnresolvedVariable {
+  /**
+   * Creates an [[UnresolvedVariable]], parsing segments separated by dots ('.').
+   */
+  def apply(name: String): UnresolvedVariable =
+    new UnresolvedVariable(CatalystSqlParser.parseMultipartIdentifier(name))
+
+  /**
+   * Creates an [[UnresolvedVariable]], from a single quoted string (for example using backticks in
+   * HiveQL.  Since the string is consider quoted, no processing is done on the name.
+   */
+  def quoted(name: String): UnresolvedVariable = new UnresolvedVariable(Seq(name))
+
+  /**
+   * Creates an [[UnresolvedVariable]] from a string in an embedded language.  In this case
+   * we treat it as a quoted identifier, except for '.', which must be further quoted using
+   * backticks if it is part of a column name.
+   */
+  def quotedString(name: String): UnresolvedVariable =
+    new UnresolvedVariable(parseVariableName(name))
+
+  /**
+   * Used to split attribute name by dot with backticks rule.
+   * Backticks must appear in pairs, and the quoted string must be a complete name part,
+   * which means `ab..c`e.f is not allowed.
+   * We can use backtick only inside quoted name parts.
+   */
+  def parseVariableName(name: String): Seq[String] = {

Review Comment:
   can we have some unit test coverage for this part? the parsing semantics are detailed/tricky.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+* A place holder used to hold a reference that has been resolved to a field outside of the current
+* plan. This is used for correlated subqueries.
+*/

Review Comment:
   ```suggestion
    */
   ```



##########
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?      #createVariable

Review Comment:
   Looking at the `dataType`, one of the possibilities is an `identifier`:
   
   ```
   dataType
       : complex=ARRAY LT dataType GT                              #complexDataType
       ...
       | type (LEFT_PAREN INTEGER_VALUE
         (COMMA INTEGER_VALUE)* RIGHT_PAREN)?                      #primitiveDataType
       ;
   
   type
       : BOOLEAN
       ...
       | unsupportedType=identifier
       ;
   ```
   
   So if `DEFAULT` or `EQ` match against this, could there be parser ambiguity between the `dataType` and the `variableDefaultExpression`, since both are optional? If so, should we add an explicit check for the ambiguous case(s) to make sure they don't fall through the cracks?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+* A place holder used to hold a reference that has been resolved to a field outside of the current

Review Comment:
   ```suggestion
    * A place holder used to hold a reference that has been resolved to a field outside of the current
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org