You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "srielau (via GitHub)" <gi...@apache.org> on 2023/03/18 06:13:42 UTC

[GitHub] [spark] srielau opened a new pull request, #40474: [SPARK-42849] [WIP] [SQL] Session Variables

srielau opened a new pull request, #40474:
URL: https://github.com/apache/spark/pull/40474

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285368915


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -40,7 +42,11 @@ import org.apache.spark.sql.types.StructField
  * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
  *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
  */
-case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+class ResolveColumnDefaultInInsert(catalog: SessionCatalog)

Review Comment:
   shall we rename it to `ResolveColumnDefaultInCommand`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1275784972


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -527,7 +530,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
-  object ResolveGroupingAnalytics extends Rule[LogicalPlan] {
+  class ResolveGroupingAnalytics(catalog: SessionCatalog) extends Rule[LogicalPlan] {

Review Comment:
   yea we can remove the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1283632031


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -2471,6 +2496,11 @@
     ],
     "sqlState" : "42883"
   },
+  "ROW_SUBQUERY_TOO_MANY_ROWS" : {
+    "message" : [
+      "More than one row returned by a subquery used as a row."
+    ],
+    "sqlState" : "21000"

Review Comment:
   ```suggestion
       "sqlState" : "21000"
     },
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [WIP] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1254632343


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],

Review Comment:
   Not a native speaker, but I think singular is correct here: List Of Identifiers but identifierList.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285422791


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+ * A place holder used to hold a reference that has been resolved to a field outside of the current
+ * plan. This is used for correlated subqueries.
+ */
+case class VariableReference(varName: String, literal: Literal, canFold: Boolean = true)
+  extends LeafExpression {
+
+  override def dataType: DataType = literal.dataType
+  override def nullable: Boolean = literal.nullable
+  override def prettyName: String = "session." + varName
+  override def foldable: Boolean = canFold
+
+  override def sql: String = varName
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

Review Comment:
   SGTM. `RUNTIME_REPLACEABLE` should only be used for expressions extending `RuntimeReplaceable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285369109


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -76,6 +82,25 @@ case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolu
         }
       }
 
+    case s: SetVariable  if s.sourceQuery.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+
+      val expectedQuerySchema = s.targetVariables.map { variable =>
+        variable match {
+          case v: UnresolvedVariable =>
+            val varIdent = VariableIdentifier(v.nameParts)
+            val varInfo = sessionCatalog.getVariable(varIdent)
+            if (!varInfo.isDefined) {
+              throw unresolvedVariableError(varIdent, Seq("SESSION"))

Review Comment:
   how about `SYSTEM.SESSION` to be fully qualified?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285367627


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1710,7 +1722,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
         val resolvedNoOuter = partitionExprs.map(resolveExpressionByPlanChildren(_, r))
         val (newPartitionExprs, newChild) = resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
         // Outer reference has lower priority than this. See the doc of `ResolveReferences`.
-        val finalPartitionExprs = newPartitionExprs.map(resolveOuterRef)
+        val resolvedWithOuterRefs = newPartitionExprs.map(resolveOuterRef)

Review Comment:
   ```suggestion
           val resolvedWithOuter = newPartitionExprs.map(resolveOuterRef)
   ```
   
   to match the existing naming style in this rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1286212926


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+ * A place holder used to hold a reference that has been resolved to a field outside of the current
+ * plan. This is used for correlated subqueries.
+ */
+case class VariableReference(varName: String, literal: Literal, canFold: Boolean = true)
+  extends LeafExpression {
+
+  override def dataType: DataType = literal.dataType
+  override def nullable: Boolean = literal.nullable
+  override def prettyName: String = "session." + varName
+  override def foldable: Boolean = canFold
+
+  override def sql: String = varName
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285380031


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala:
##########
@@ -291,3 +292,7 @@ class NoSuchIndexException private(
       messageParameters = Map.empty[String, String])
   }
 }
+
+class NoSuchVariableException(variable: Seq[String])

Review Comment:
   I was following: "when in Rome, do as the Romans do". Can change it whole sale for all new errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1275784429


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala:
##########
@@ -137,6 +137,15 @@ class ViewAlreadyExistsException(errorClass: String, messageParameters: Map[Stri
       messageParameters = Map("relationName" -> ident.quoted))
 }
 
+class VariableAlreadyExistsException(errorClass: String, messageParameters: Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {

Review Comment:
   since we have error class, shall we just throw `AnalysisException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1264223983


##########
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?      #createVariable

Review Comment:
   ```suggestion
           multipartIdentifier dataType? variableDefaultExpression?       #createVariable
   ```



##########
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -210,6 +213,9 @@ statement
     | SET TIME ZONE interval                                           #setTimeZone
     | SET TIME ZONE timezone                                           #setTimeZone
     | SET TIME ZONE .*?                                                #setTimeZone
+    | SET (VARIABLE | VAR) assignmentList                             #setVariable

Review Comment:
   ```suggestion
       | SET (VARIABLE | VAR) assignmentList                              #setVariable
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -286,13 +288,13 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
       ResolveFieldNameAndPosition ::
       AddMetadataColumns ::
       DeduplicateRelations ::
-      ResolveReferences ::
+      new ResolveReferences(v1SessionCatalog) ::

Review Comment:
   optional: you can make these `case class`es instead, to avoid the `new` keyword here and below



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {

Review Comment:
   once code appears in `Analyzer.scala`, we generally don't ever move it out (to avoid messing up the GitHub blame history). However `Analyzer.scala` is already huge with many thousands of lines; could we move this new rule into a separate file instead if we can?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {

Review Comment:
   please fix ident (-2 spaces for each line)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {

Review Comment:
   please fix indentation?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)

Review Comment:
   add a brief comment saying what this is?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+* A place holder used to hold a reference that has been resolved to a field outside of the current
+* plan. This is used for correlated subqueries.

Review Comment:
   ```suggestion
    * plan. This is used for correlated subqueries.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -201,6 +201,109 @@ object UnresolvedTVFAliases {
   }
 }
 
+/**
+ * Holds the name of a variable (on the left hand side of a SET) that has yet to be resolved.
+ */
+case class UnresolvedVariable(nameParts: Seq[String]) extends Attribute with Unevaluable {
+
+  def name: String =
+    nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
+
+  override def exprId: ExprId = throw new UnresolvedException("exprId")
+  override def dataType: DataType = throw new UnresolvedException("dataType")
+  override def nullable: Boolean = throw new UnresolvedException("nullable")
+  override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
+  override lazy val resolved = false
+
+  override def newInstance(): UnresolvedVariable = this
+  override def withNullability(newNullability: Boolean): UnresolvedVariable = this
+  override def withQualifier(newQualifier: Seq[String]): UnresolvedVariable = this
+  override def withName(newName: String): UnresolvedVariable = UnresolvedVariable.quoted(newName)
+  override def withMetadata(newMetadata: Metadata): Attribute = this
+  override def withExprId(newExprId: ExprId): UnresolvedVariable = this
+  override def withDataType(newType: DataType): Attribute = this
+  final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_ATTRIBUTE)
+
+  override def toString: String = s"'$name"
+
+  override def sql: String = nameParts.map(quoteIfNeeded(_)).mkString(".")
+
+  /**
+   * Returns true if this matches the token. This requires the variable to only have one part in
+   * its name and that matches the given token in a case insensitive way.
+   */
+  def equalsIgnoreCase(token: String): Boolean = {
+    nameParts.length == 1 && nameParts.head.equalsIgnoreCase(token)
+  }
+}
+
+object UnresolvedVariable {
+  /**
+   * Creates an [[UnresolvedVariable]], parsing segments separated by dots ('.').
+   */
+  def apply(name: String): UnresolvedVariable =
+    new UnresolvedVariable(CatalystSqlParser.parseMultipartIdentifier(name))
+
+  /**
+   * Creates an [[UnresolvedVariable]], from a single quoted string (for example using backticks in
+   * HiveQL.  Since the string is consider quoted, no processing is done on the name.

Review Comment:
   ```suggestion
      * HiveQL). Since the string is considered quoted, no processing is done on the name.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -527,7 +530,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
-  object ResolveGroupingAnalytics extends Rule[LogicalPlan] {
+  class ResolveGroupingAnalytics(catalog: SessionCatalog) extends Rule[LogicalPlan] {

Review Comment:
   it doesn't appear that this rule uses this newly provided `catalog` anywhere?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {
+                throw  unresolvedVariableError(varIdent, Seq("SESSION"))
+              }
+              VariableReference(varIdent.variableName, varInfo.get._1, canFold = false)
+            case other => other
+          }
+        }
+
+        /**
+         * Protect against duplicate variable names
+         */
+        val varNames = resolvedVars.collect { case variable => variable.toString }
+        val dups = varNames.diff(varNames.distinct).distinct

Review Comment:
   do we need to consider case sensitivity here? Leave a brief comment?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {

Review Comment:
   you can use
   
   ```
   sessionCatalog.getVariable(varIdent).map { varInfo =>
     VariableReference(varIdent.variableName, varInfo.get._1, canFold = false)
   }.getOrElse {
     throw unresolvedVariableError(varIdent, Seq("SESSION"))
   }



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3429,7 +3491,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
    * Replaces [[UnresolvedDeserializer]] with the deserialization expression that has been resolved
    * to the given input attributes.
    */
-  object ResolveDeserializer extends Rule[LogicalPlan] {
+  class ResolveDeserializer(catalog: SessionCatalog) extends Rule[LogicalPlan] {

Review Comment:
   this `catalog` doesn't appear to get used; can we remove it?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {
+                throw  unresolvedVariableError(varIdent, Seq("SESSION"))

Review Comment:
   ```suggestion
                   throw unresolvedVariableError(varIdent, Seq("SESSION"))
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)

Review Comment:
   you can use
   
   ```
   catalog.getVariable(varIdentifier).map { varInfo =>
     VarInfoObj(varIdentifier, varInfo._2)
   }.getOrElse {
     throw new NoSuchVariableException(varIdentifier.nameParts) 
   }



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)
+      if (varInfo.isEmpty) {
+        throw new NoSuchVariableException(varIdentifier.nameParts)
+      }
+      VarInfoObj(varIdentifier, varInfo.get._2)
+    }
+    val array = query.executeCollect()
+    if (array.length == 0) {
+      varInfoList foreach(info => catalog.createTempVariable(info.identifier.variableName,
+        Literal(null, info.fieldInfo.dataType),
+        info.fieldInfo.getCurrentDefaultValue().get
+        , overrideIfExists = true))

Review Comment:
   move comma to the end of the previous line?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)
+      if (varInfo.isEmpty) {
+        throw new NoSuchVariableException(varIdentifier.nameParts)
+      }
+      VarInfoObj(varIdentifier, varInfo.get._2)
+    }
+    val array = query.executeCollect()
+    if (array.length == 0) {
+      varInfoList foreach(info => catalog.createTempVariable(info.identifier.variableName,
+        Literal(null, info.fieldInfo.dataType),
+        info.fieldInfo.getCurrentDefaultValue().get

Review Comment:
   how do we know this `.get` will succeed?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetVariableExec.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{InternalRow, VariableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchVariableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, VariableReference}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructField
+
+/**
+ * Physical plan node for setting a variable.
+ */
+case class SetVariableExec(variables: Seq[Expression],
+             query: SparkPlan) extends V2CommandExec with UnaryLike[SparkPlan] {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+
+    val catalog = session.sessionState.catalog
+
+    case class VarInfoObj(identifier: VariableIdentifier, fieldInfo: StructField)
+
+    val varInfoList = variables.collect { case v: VariableReference =>
+      val varIdentifier = VariableIdentifier(v.varName)
+      val varInfo = catalog.getVariable(varIdentifier)
+      if (varInfo.isEmpty) {
+        throw new NoSuchVariableException(varIdentifier.nameParts)
+      }
+      VarInfoObj(varIdentifier, varInfo.get._2)
+    }
+    val array = query.executeCollect()
+    if (array.length == 0) {
+      varInfoList foreach(info => catalog.createTempVariable(info.identifier.variableName,
+        Literal(null, info.fieldInfo.dataType),
+        info.fieldInfo.getCurrentDefaultValue().get
+        , overrideIfExists = true))
+    } else if (array.length > 1) {
+      throw QueryExecutionErrors.multipleRowSubqueryError()
+    } else {

Review Comment:
   you can drop the `else` since we throw an exception above?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -201,6 +201,109 @@ object UnresolvedTVFAliases {
   }
 }
 
+/**
+ * Holds the name of a variable (on the left hand side of a SET) that has yet to be resolved.
+ */
+case class UnresolvedVariable(nameParts: Seq[String]) extends Attribute with Unevaluable {
+
+  def name: String =
+    nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
+
+  override def exprId: ExprId = throw new UnresolvedException("exprId")
+  override def dataType: DataType = throw new UnresolvedException("dataType")
+  override def nullable: Boolean = throw new UnresolvedException("nullable")
+  override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
+  override lazy val resolved = false
+
+  override def newInstance(): UnresolvedVariable = this
+  override def withNullability(newNullability: Boolean): UnresolvedVariable = this
+  override def withQualifier(newQualifier: Seq[String]): UnresolvedVariable = this
+  override def withName(newName: String): UnresolvedVariable = UnresolvedVariable.quoted(newName)
+  override def withMetadata(newMetadata: Metadata): Attribute = this
+  override def withExprId(newExprId: ExprId): UnresolvedVariable = this
+  override def withDataType(newType: DataType): Attribute = this
+  final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_ATTRIBUTE)
+
+  override def toString: String = s"'$name"
+
+  override def sql: String = nameParts.map(quoteIfNeeded(_)).mkString(".")
+
+  /**
+   * Returns true if this matches the token. This requires the variable to only have one part in
+   * its name and that matches the given token in a case insensitive way.
+   */
+  def equalsIgnoreCase(token: String): Boolean = {
+    nameParts.length == 1 && nameParts.head.equalsIgnoreCase(token)
+  }
+}
+
+object UnresolvedVariable {
+  /**
+   * Creates an [[UnresolvedVariable]], parsing segments separated by dots ('.').
+   */
+  def apply(name: String): UnresolvedVariable =
+    new UnresolvedVariable(CatalystSqlParser.parseMultipartIdentifier(name))
+
+  /**
+   * Creates an [[UnresolvedVariable]], from a single quoted string (for example using backticks in
+   * HiveQL.  Since the string is consider quoted, no processing is done on the name.
+   */
+  def quoted(name: String): UnresolvedVariable = new UnresolvedVariable(Seq(name))
+
+  /**
+   * Creates an [[UnresolvedVariable]] from a string in an embedded language.  In this case
+   * we treat it as a quoted identifier, except for '.', which must be further quoted using
+   * backticks if it is part of a column name.
+   */
+  def quotedString(name: String): UnresolvedVariable =
+    new UnresolvedVariable(parseVariableName(name))
+
+  /**
+   * Used to split attribute name by dot with backticks rule.
+   * Backticks must appear in pairs, and the quoted string must be a complete name part,
+   * which means `ab..c`e.f is not allowed.
+   * We can use backtick only inside quoted name parts.
+   */
+  def parseVariableName(name: String): Seq[String] = {

Review Comment:
   can we have some unit test coverage for this part? the parsing semantics are detailed/tricky.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+* A place holder used to hold a reference that has been resolved to a field outside of the current
+* plan. This is used for correlated subqueries.
+*/

Review Comment:
   ```suggestion
    */
   ```



##########
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?      #createVariable

Review Comment:
   Looking at the `dataType`, one of the possibilities is an `identifier`:
   
   ```
   dataType
       : complex=ARRAY LT dataType GT                              #complexDataType
       ...
       | type (LEFT_PAREN INTEGER_VALUE
         (COMMA INTEGER_VALUE)* RIGHT_PAREN)?                      #primitiveDataType
       ;
   
   type
       : BOOLEAN
       ...
       | unsupportedType=identifier
       ;
   ```
   
   So if `DEFAULT` or `EQ` match against this, could there be parser ambiguity between the `dataType` and the `variableDefaultExpression`, since both are optional? If so, should we add an explicit check for the ambiguous case(s) to make sure they don't fall through the cracks?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+* A place holder used to hold a reference that has been resolved to a field outside of the current

Review Comment:
   ```suggestion
    * A place holder used to hold a reference that has been resolved to a field outside of the current
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285340334


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -1217,6 +1236,44 @@
     ],
     "sqlState" : "XX000"
   },
+  "INTERNAL_ERROR_METADATA_CATALOG" : {
+    "message" : [
+      "An object in teh metadata catalog has been corrupted:"

Review Comment:
   ```suggestion
         "An object in the metadata catalog has been corrupted:"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285368915


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -40,7 +42,11 @@ import org.apache.spark.sql.types.StructField
  * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
  *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
  */
-case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+class ResolveColumnDefaultInInsert(catalog: SessionCatalog)

Review Comment:
   shall we rename it to `ResolveColumnDefaultInCommandInputQuery`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1282146917


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1284020650


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala:
##########
@@ -137,6 +137,15 @@ class ViewAlreadyExistsException(errorClass: String, messageParameters: Map[Stri
       messageParameters = Map("relationName" -> ident.quoted))
 }
 
+class VariableAlreadyExistsException(errorClass: String, messageParameters: Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {

Review Comment:
   Yea, people should use error class name to differentiate errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285341232


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?       #createVariable

Review Comment:
   why can data type be optional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285370162


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -34,6 +35,36 @@ import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegralType, MapType, StructType}
 
 object TableOutputResolver {
+
+  def resolveVariableOutputColumns(
+      expected: Seq[Expression],
+      query: LogicalPlan,
+      conf: SQLConf): LogicalPlan = {
+
+    if (expected.size != query.output.size) {
+      throw new AnalysisException(errorClass = "ASSIGNMENT_ARITY_MISMATCH",
+        messageParameters = Map("numTarget" -> expected.size.toString,
+          "numExpr" -> query.output.size.toString))
+    }
+
+    val resolved: Seq[NamedExpression] = {
+      query.output.zip(expected.asInstanceOf[Seq[VariableReference]]).map {
+        case (inputCol, expected) =>
+        if (DataTypeUtils.sameType(inputCol.dataType, expected.dataType)) {
+          inputCol
+        } else {
+          Alias(cast(inputCol, expected.dataType, conf, expected.varName), expected.varName)()

Review Comment:
   shall we respect the store assignment policy, like INSERT does?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285370620


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+ * A place holder used to hold a reference that has been resolved to a field outside of the current
+ * plan. This is used for correlated subqueries.
+ */
+case class VariableReference(varName: String, literal: Literal, canFold: Boolean = true)
+  extends LeafExpression {
+
+  override def dataType: DataType = literal.dataType
+  override def nullable: Boolean = literal.nullable
+  override def prettyName: String = "session." + varName
+  override def foldable: Boolean = canFold
+
+  override def sql: String = varName
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

Review Comment:
   it's not a RuntimeReplaceable, we should add a new node pattern, or leave it as empty.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:
##########


Review Comment:
   unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1275787391


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1708,7 +1719,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
         val resolvedNoOuter = partitionExprs.map(resolveExpressionByPlanChildren(_, r))
         val (newPartitionExprs, newChild) = resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
         // Outer reference has lower priority than this. See the doc of `ResolveReferences`.
-        val finalPartitionExprs = newPartitionExprs.map(resolveOuterRef)
+        val resolvedWithOuterExprs = newPartitionExprs.map(resolveOuterRef)

Review Comment:
   ```suggestion
           val resolvedWithOuterRefs = newPartitionExprs.map(resolveOuterRef)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] melihsozdinler commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "melihsozdinler (via GitHub)" <gi...@apache.org>.
melihsozdinler commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285903634


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -567,6 +568,135 @@ class SparkSqlAstBuilder extends AstBuilder {
     }
   }
 
+  /**
+   * Create a [[CreateVariableCommand]].
+   *
+   * For example:
+   * {{{
+   *   DECLARE [OR REPLACE] [VARIABLE] [db_name.]variable_name
+   *   [dataType] [defaultExpression];
+   * }}}
+   *
+   * We will ad CREATE VARIABLE for persisted variable definitions to this, hence the name...
+   */
+  override def visitCreateVariable(ctx: CreateVariableContext): LogicalPlan = withOrigin(ctx) {
+
+    def format(name: String): String = {
+      if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+    }
+
+    val multipartIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
+    val defaultExpression = if (ctx.variableDefaultExpression() == null) {
+      "null"
+    } else {
+      visitVariableDefaultExpression(ctx.variableDefaultExpression())
+    }
+    val dataTypeStr: Option[String] =
+      if (Option(ctx.dataType).nonEmpty) {
+        Option(source(Option(ctx.dataType).get))
+      } else {
+        Option(null)
+      }
+
+    if (multipartIdentifier.length > 3) {
+      throw QueryParsingErrors.unsupportedVariableNameError(
+        multipartIdentifier, ctx.multipartIdentifier)
+    }
+
+    val schemaQualifiedName = if (multipartIdentifier.length < 2) {
+      SESSION_DATABASE +: multipartIdentifier
+    } else {
+      multipartIdentifier
+    }
+
+    val catalogQualifiedName = if (schemaQualifiedName.length < 3) {
+      SYSTEM_CATALOG +: schemaQualifiedName
+    } else {
+      schemaQualifiedName
+    }
+    val variableIdentifier = VariableIdentifier(catalogQualifiedName)
+
+    if (variableIdentifier != VariableIdentifier(
+      Seq(SYSTEM_CATALOG, SESSION_DATABASE, format(catalogQualifiedName.last)))) {
+      throw QueryParsingErrors.unsupportedVariableNameError(
+        multipartIdentifier, ctx.multipartIdentifier)
+    }
+
+    CreateVariableCommand(variableIdentifier, dataTypeStr, defaultExpression, ctx.REPLACE != null)
+  }
+
+  /**
+   * Create a DROP VARIABLE statement.
+   *
+   * For example:
+   * {{{
+   *   DROP TEMPORARY VARIABLE [IF EXISTS] variable;
+   * }}}
+   */
+  override def visitDropVariable(ctx: DropVariableContext): LogicalPlan = withOrigin(ctx) {
+    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
+    if (variableName.length > 3) {
+      throw QueryParsingErrors.unsupportedVariableNameError(variableName, ctx)
+    }
+
+    val variableIdentifier = VariableIdentifier(variableName)
+
+    if (ctx.TEMPORARY() != null) {
+      if (variableIdentifier.database.getOrElse(SESSION_DATABASE) != SESSION_DATABASE ||
+        variableIdentifier.catalog.getOrElse(SYSTEM_CATALOG) != SYSTEM_CATALOG) {
+        throw QueryParsingErrors.unsupportedVariableNameError(variableIdentifier.nameParts, ctx)
+      }
+
+      DropVariableCommand(
+        identifier = VariableIdentifier(Seq(SYSTEM_CATALOG, SESSION_DATABASE,
+          variableIdentifier.variableName)),
+        ifExists = ctx.EXISTS != null)
+    } else {
+      DropVariableCommand(
+        identifier = variableIdentifier,
+        ifExists = ctx.EXISTS != null)
+    }
+  }
+
+  override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
+
+    if (ctx.query() != null) {
+      /**

Review Comment:
   We may use the correct one-liner comment based on this information here: https://github.com/databricks/scala-style-guide#documentation-style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] melihsozdinler commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "melihsozdinler (via GitHub)" <gi...@apache.org>.
melihsozdinler commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285909119


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -614,6 +632,69 @@ class SessionCatalog(
     new Path(new Path(dbLocation), qualifiedIdent.table).toUri
   }
 
+  // --------------------------------------------------
+  // | Methods that interact with temp variables only |
+  // --------------------------------------------------
+
+  /**
+   * Create a temporary variable.
+   */
+  def createTempVariable(
+                      name: String,
+                      initialValue: Literal,
+                      variableDefault: String,
+                      overrideIfExists: Boolean): Unit = synchronized {
+    val tempVariable = VariableIdentifier(Seq(SYSTEM_CATALOG, SESSION_DATABASE, name))
+    if (variables.contains(tempVariable) && !overrideIfExists) {
+      throw new AnalysisException(errorClass = "VARIABLE_ALREADY_EXISTS",
+        messageParameters = Map("variableName"
+          -> quoteNameParts(UnresolvedAttribute.parseAttributeName(name))))
+    }
+    val structField = StructField(name, initialValue.dataType)
+      .withCurrentDefaultValue(variableDefault)
+    variables.put(tempVariable, (initialValue, structField))
+  }
+
+  /**
+   * Generate a [[Variable]] operator from the temporary variable stored.
+   */
+  def getVariable(variable: VariableIdentifier): Option[(Literal, StructField)] = synchronized {
+    val qualifiedVariable = VariableIdentifier(variable.variableName,
+      Some(variable.database.getOrElse(SESSION_DATABASE)),
+      Some(variable.catalog.getOrElse(SYSTEM_CATALOG)))
+    val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+    val referredTempVariableNames = AnalysisContext.get.referredTempVariableNames
+    if (isResolvingView) {
+      // When resolving a view, only return a temp variable if it's referred by this view.
+      if (referredTempVariableNames.contains(variable.variableName)) {
+        variables.get(qualifiedVariable)
+      } else {
+        None
+      }
+    } else {
+      val result = variables.get(qualifiedVariable)
+      if (result.isDefined) {
+        // We are not resolving a view and the function is a temp one, add it to
+        // `AnalysisContext`, so during the view creation, we can save all referred temp
+        // variables to view metadata.
+        AnalysisContext.get.referredTempVariableNames.add(variable.variableName)
+      }
+      result
+    }
+  }
+
+  /**
+   * Drop a temporary variable.
+   *4

Review Comment:
   This "4" is typo issue, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285375918


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -34,6 +35,36 @@ import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegralType, MapType, StructType}
 
 object TableOutputResolver {
+
+  def resolveVariableOutputColumns(
+      expected: Seq[Expression],
+      query: LogicalPlan,
+      conf: SQLConf): LogicalPlan = {
+
+    if (expected.size != query.output.size) {
+      throw new AnalysisException(errorClass = "ASSIGNMENT_ARITY_MISMATCH",
+        messageParameters = Map("numTarget" -> expected.size.toString,
+          "numExpr" -> query.output.size.toString))
+    }
+
+    val resolved: Seq[NamedExpression] = {
+      query.output.zip(expected.asInstanceOf[Seq[VariableReference]]).map {
+        case (inputCol, expected) =>
+        if (DataTypeUtils.sameType(inputCol.dataType, expected.dataType)) {
+          inputCol
+        } else {
+          Alias(cast(inputCol, expected.dataType, conf, expected.varName), expected.varName)()

Review Comment:
   Yes we shall, how do we do that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1286473766


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -34,6 +35,36 @@ import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegralType, MapType, StructType}
 
 object TableOutputResolver {
+
+  def resolveVariableOutputColumns(
+      expected: Seq[Expression],
+      query: LogicalPlan,
+      conf: SQLConf): LogicalPlan = {
+
+    if (expected.size != query.output.size) {
+      throw new AnalysisException(errorClass = "ASSIGNMENT_ARITY_MISMATCH",
+        messageParameters = Map("numTarget" -> expected.size.toString,
+          "numExpr" -> query.output.size.toString))
+    }
+
+    val resolved: Seq[NamedExpression] = {
+      query.output.zip(expected.asInstanceOf[Seq[VariableReference]]).map {
+        case (inputCol, expected) =>
+        if (DataTypeUtils.sameType(inputCol.dataType, expected.dataType)) {
+          inputCol
+        } else {
+          Alias(cast(inputCol, expected.dataType, conf, expected.varName), expected.varName)()

Review Comment:
   OK, I have changed it to ALWAYS use store assign and ANIS mode. Given that it's new that's defensible.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #40474: [SPARK-42849] [SQL] Session Variables
URL: https://github.com/apache/spark/pull/40474


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1286213566


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?       #createVariable

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1282154695


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40474: [SPARK-42849] [WIP] [SQL] Session Variables

Posted by "jaceklaskowski (via GitHub)" <gi...@apache.org>.
jaceklaskowski commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1149130887


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -582,9 +583,29 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
-    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val expression = ctx.expression().toString
-    SetVariableCommand(VariableIdentifier(variableName.last), expression)
+    if (ctx.assignmentList().isEmpty) {
+      SetVariableCommand(Seq(VariableIdentifier("var")), "7")

Review Comment:
   7?! What does this "magic number" represent?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -582,9 +583,29 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
-    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val expression = ctx.expression().toString
-    SetVariableCommand(VariableIdentifier(variableName.last), expression)
+    if (ctx.assignmentList().isEmpty) {
+      SetVariableCommand(Seq(VariableIdentifier("var")), "7")
+    } else {
+      val assignCtx = ctx.assignmentList()
+      val varList = assignCtx.assignment().asScala.map {
+        assign => {

Review Comment:
   nit: no need for `{`. I'd also move `assign` to the line above (as more Scala-idiomatic IMHO)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>

Review Comment:
   nit: `varInfos`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -582,9 +583,29 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
-    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val expression = ctx.expression().toString
-    SetVariableCommand(VariableIdentifier(variableName.last), expression)
+    if (ctx.assignmentList().isEmpty) {
+      SetVariableCommand(Seq(VariableIdentifier("var")), "7")
+    } else {
+      val assignCtx = ctx.assignmentList()
+      val varList = assignCtx.assignment().asScala.map {
+        assign => {
+          val varname = visitMultipartIdentifier(assign.key)
+          VariableIdentifier(varname.head)
+        }
+      }
+
+      val assignments = assignCtx.assignment()
+      val exprStrList = assignments.asScala.map {
+        assign => {

Review Comment:
   nit: no need for `{`. I'd also move assign to the line above (as more Scala-idiomatic IMHO)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {

Review Comment:
   nit: `map` (not `collect`)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {
+      case variable => Alias(Cast(UnresolvedAttribute(variable._1),

Review Comment:
   Please use destructuring on assignment to avoid these mysterious `_1` and `_2` 🙏 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {

Review Comment:
   ```
   val (_, name) = varInfo.getOrElse { throw ... }
   (identifier.variableName, name)
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {
+      case variable => Alias(Cast(UnresolvedAttribute(variable._1),
+        variable._2.dataType), variable._1)() }
+
+    val parsed = parser.parseQuery(newQueryStr)
+    val subQueryAlias = SubqueryAlias("T", UnresolvedSubqueryColumnAliases(varNames, parsed))
+
+    val analyzed: LogicalPlan = analyzer.execute(Project(casts, subQueryAlias))
+    analyzer.checkAnalysis(analyzed)
+
+    val df = Dataset.ofRows(sparkSession, analyzed)
+    val array = df.collect()
+    if (array.length == 0) {
+    } else if (array.length > 1) {

Review Comment:
   What does this `else if` do? Is it needed?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }

Review Comment:
   ```
   varInfoList.map { case (name, _) => name }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],

Review Comment:
   nit: `identifiers` plural 🙏 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {
+      case variable => Alias(Cast(UnresolvedAttribute(variable._1),
+        variable._2.dataType), variable._1)() }
+
+    val parsed = parser.parseQuery(newQueryStr)
+    val subQueryAlias = SubqueryAlias("T", UnresolvedSubqueryColumnAliases(varNames, parsed))
+
+    val analyzed: LogicalPlan = analyzer.execute(Project(casts, subQueryAlias))
+    analyzer.checkAnalysis(analyzed)
+
+    val df = Dataset.ofRows(sparkSession, analyzed)
+    val array = df.collect()
+    if (array.length == 0) {
+    } else if (array.length > 1) {
+    } else {
+      val row = array(0)
+      (row.toSeq zip varInfoList) map( row => {
+        catalog.createTempVariable(row._2._1, Literal(row._1, row._2._2.dataType),

Review Comment:
   Things got a bit wild here with `_2._1`-like accessors, don't they? 😉 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [WIP] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1254616159


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -582,9 +583,29 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
-    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val expression = ctx.expression().toString
-    SetVariableCommand(VariableIdentifier(variableName.last), expression)
+    if (ctx.assignmentList().isEmpty) {
+      SetVariableCommand(Seq(VariableIdentifier("var")), "7")

Review Comment:
   This was temporary code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1282246257


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {
+            case v: UnresolvedVariable =>
+              val varIdent = VariableIdentifier(v.nameParts)
+              val varInfo = sessionCatalog.getVariable(varIdent)
+              if (!varInfo.isDefined) {
+                throw  unresolvedVariableError(varIdent, Seq("SESSION"))
+              }
+              VariableReference(varIdent.variableName, varInfo.get._1, canFold = false)
+            case other => other
+          }
+        }
+
+        /**
+         * Protect against duplicate variable names
+         */
+        val varNames = resolvedVars.collect { case variable => variable.toString }
+        val dups = varNames.diff(varNames.distinct).distinct

Review Comment:
   Done. The name is formatted when we create the variable (same as we do for other objects such as schema names)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1286212692


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala:
##########
@@ -291,3 +292,7 @@ class NoSuchIndexException private(
       messageParameters = Map.empty[String, String])
   }
 }
+
+class NoSuchVariableException(variable: Seq[String])

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285874167


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+ * A place holder used to hold a reference that has been resolved to a field outside of the current
+ * plan. This is used for correlated subqueries.
+ */
+case class VariableReference(varName: String, literal: Literal, canFold: Boolean = true)
+  extends LeafExpression {
+
+  override def dataType: DataType = literal.dataType
+  override def nullable: Boolean = literal.nullable
+  override def prettyName: String = "session." + varName
+  override def foldable: Boolean = canFold
+
+  override def sql: String = varName
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

Review Comment:
   But do we still want the same logic? I.e., I will trigger this new tag in replaceExpression()s...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285368174


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -384,7 +443,8 @@ trait ColumnResolutionHelper extends Logging {
       allowOuter = allowOuter)
   }
 
-  def resolveExprInAssignment(expr: Expression, hostPlan: LogicalPlan): Expression = {
+  def resolveExprInAssignment(expr: Expression, hostPlan: LogicalPlan):
+  Expression = {

Review Comment:
   unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on pull request #40474: [SPARK-42849] [WIP] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on PR #40474:
URL: https://github.com/apache/spark/pull/40474#issuecomment-1636400037

   @cloud-fan @dtenedor @gengliangwang I'm off for a week. This PR is in good enough shape to get a once over.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1282148744


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(COMMAND), ruleId) {
+      case setVariable: SetVariable
+        if setVariable.sourceQuery.resolved && !setVariable.targetVariables.forall(_.resolved) =>
+
+        /**
+         * Resolve the left hand side of the SET
+         */
+        val resolvedVars = setVariable.targetVariables.map { variable =>
+            variable match {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285376081


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+ * A place holder used to hold a reference that has been resolved to a field outside of the current
+ * plan. This is used for correlated subqueries.
+ */
+case class VariableReference(varName: String, literal: Literal, canFold: Boolean = true)
+  extends LeafExpression {
+
+  override def dataType: DataType = literal.dataType
+  override def nullable: Boolean = literal.nullable
+  override def prettyName: String = "session." + varName
+  override def foldable: Boolean = canFold
+
+  override def sql: String = varName
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

Review Comment:
   It's not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285367230


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?       #createVariable

Review Comment:
   ah I see, but at least one of them should be specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285367745


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1725,7 +1738,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
         val resolvedWithAgg = resolveColWithAgg(resolvedNoOuter, child)
         val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(resolvedWithAgg), child)
         // Outer reference has lowermost priority. See the doc of `ResolveReferences`.
-        val finalCond = resolveOuterRef(newCond.head)
+        val withOuterCond = resolveOuterRef(newCond.head)

Review Comment:
   ```suggestion
           val resolvedWithOuter = resolveOuterRef(newCond.head)
   ```
   
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1275785477


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3315,6 +3329,54 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. This rule will:
+   *
+   * - Insert casts when data types do not match
+   * - Detect plans that are not compatible with the output table and throw AnalysisException
+   */
+  object ResolveSetVariable extends Rule[LogicalPlan] {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] melihsozdinler commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "melihsozdinler (via GitHub)" <gi...@apache.org>.
melihsozdinler commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285899298


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -567,6 +568,135 @@ class SparkSqlAstBuilder extends AstBuilder {
     }
   }
 
+  /**
+   * Create a [[CreateVariableCommand]].
+   *
+   * For example:
+   * {{{
+   *   DECLARE [OR REPLACE] [VARIABLE] [db_name.]variable_name
+   *   [dataType] [defaultExpression];
+   * }}}
+   *
+   * We will ad CREATE VARIABLE for persisted variable definitions to this, hence the name...
+   */
+  override def visitCreateVariable(ctx: CreateVariableContext): LogicalPlan = withOrigin(ctx) {
+
+    def format(name: String): String = {
+      if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+    }
+
+    val multipartIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
+    val defaultExpression = if (ctx.variableDefaultExpression() == null) {
+      "null"
+    } else {
+      visitVariableDefaultExpression(ctx.variableDefaultExpression())
+    }
+    val dataTypeStr: Option[String] =
+      if (Option(ctx.dataType).nonEmpty) {
+        Option(source(Option(ctx.dataType).get))
+      } else {
+        Option(null)
+      }
+
+    if (multipartIdentifier.length > 3) {
+      throw QueryParsingErrors.unsupportedVariableNameError(
+        multipartIdentifier, ctx.multipartIdentifier)
+    }
+
+    val schemaQualifiedName = if (multipartIdentifier.length < 2) {
+      SESSION_DATABASE +: multipartIdentifier
+    } else {
+      multipartIdentifier
+    }
+
+    val catalogQualifiedName = if (schemaQualifiedName.length < 3) {

Review Comment:
   This "3" and VariableIdentifier apply method checks length 3 are connected? If connected there should be some comment to change also inside the apply function, or we can avoid such magic number issues by targetting named some meaningful variable with a value 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285368645


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala:
##########
@@ -291,3 +292,7 @@ class NoSuchIndexException private(
       messageParameters = Map.empty[String, String])
   }
 }
+
+class NoSuchVariableException(variable: Seq[String])

Review Comment:
   Sorry I may miss the discussion result before. So we will keep adding new exception types even if we have unique error class name now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1284021061


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?

Review Comment:
   ok sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40474:
URL: https://github.com/apache/spark/pull/40474#issuecomment-1670928360

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1275786758


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1601,14 +1611,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
         // Lateral column alias has higher priority than outer reference.
         val resolvedWithLCA = resolveLateralColumnAlias(resolvedNoOuter)
         val resolvedWithOuter = resolvedWithLCA.map(resolveOuterRef)
-        p.copy(projectList = resolvedWithOuter.map(_.asInstanceOf[NamedExpression]))
+        val resolvedWithVariables = resolvedWithOuter.map(p => resolveVariables(p))

Review Comment:
   unnecessary change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285376611


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -429,6 +429,39 @@ case class OuterReference(e: NamedExpression)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
 }
 
+/**
+ * A place holder used to hold a reference that has been resolved to a field outside of the current
+ * plan. This is used for correlated subqueries.
+ */
+case class VariableReference(varName: String, literal: Literal, canFold: Boolean = true)
+  extends LeafExpression {
+
+  override def dataType: DataType = literal.dataType
+  override def nullable: Boolean = literal.nullable
+  override def prettyName: String = "session." + varName
+  override def foldable: Boolean = canFold
+
+  override def sql: String = varName
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

Review Comment:
   OK, how about: VARIABLE_REFERENCE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285379541


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?       #createVariable

Review Comment:
   Ugh, I didn't block that and produce a void. Maybe a bad idea, will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1286477051


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -567,6 +568,135 @@ class SparkSqlAstBuilder extends AstBuilder {
     }
   }
 
+  /**
+   * Create a [[CreateVariableCommand]].
+   *
+   * For example:
+   * {{{
+   *   DECLARE [OR REPLACE] [VARIABLE] [db_name.]variable_name
+   *   [dataType] [defaultExpression];
+   * }}}
+   *
+   * We will ad CREATE VARIABLE for persisted variable definitions to this, hence the name...
+   */
+  override def visitCreateVariable(ctx: CreateVariableContext): LogicalPlan = withOrigin(ctx) {
+
+    def format(name: String): String = {
+      if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+    }
+
+    val multipartIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
+    val defaultExpression = if (ctx.variableDefaultExpression() == null) {
+      "null"
+    } else {
+      visitVariableDefaultExpression(ctx.variableDefaultExpression())
+    }
+    val dataTypeStr: Option[String] =
+      if (Option(ctx.dataType).nonEmpty) {
+        Option(source(Option(ctx.dataType).get))
+      } else {
+        Option(null)
+      }
+
+    if (multipartIdentifier.length > 3) {
+      throw QueryParsingErrors.unsupportedVariableNameError(
+        multipartIdentifier, ctx.multipartIdentifier)
+    }
+
+    val schemaQualifiedName = if (multipartIdentifier.length < 2) {
+      SESSION_DATABASE +: multipartIdentifier
+    } else {
+      multipartIdentifier
+    }
+
+    val catalogQualifiedName = if (schemaQualifiedName.length < 3) {

Review Comment:
   Added a comment that we need to pad to system.session.varname



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1284021318


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1601,14 +1611,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
         // Lateral column alias has higher priority than outer reference.
         val resolvedWithLCA = resolveLateralColumnAlias(resolvedNoOuter)
         val resolvedWithOuter = resolvedWithLCA.map(resolveOuterRef)
-        p.copy(projectList = resolvedWithOuter.map(_.asInstanceOf[NamedExpression]))
+        val resolvedWithVariables = resolvedWithOuter.map(p => resolveVariables(p))

Review Comment:
   sorry I misread the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285360939


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?       #createVariable

Review Comment:
   We can derive it from the default. So you can do:
   DECLARE var = 'hello';
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285360732


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -1217,6 +1236,44 @@
     ],
     "sqlState" : "XX000"
   },
+  "INTERNAL_ERROR_METADATA_CATALOG" : {
+    "message" : [
+      "An object in teh metadata catalog has been corrupted:"

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1285372794


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?
+        multipartIdentifier dataType? variableDefaultExpression?       #createVariable

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1284452392


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala:
##########
@@ -137,6 +137,15 @@ class ViewAlreadyExistsException(errorClass: String, messageParameters: Map[Stri
       messageParameters = Map("relationName" -> ident.quoted))
 }
 
+class VariableAlreadyExistsException(errorClass: String, messageParameters: Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1286243421


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -614,6 +632,69 @@ class SessionCatalog(
     new Path(new Path(dbLocation), qualifiedIdent.table).toUri
   }
 
+  // --------------------------------------------------
+  // | Methods that interact with temp variables only |
+  // --------------------------------------------------
+
+  /**
+   * Create a temporary variable.
+   */
+  def createTempVariable(
+                      name: String,
+                      initialValue: Literal,
+                      variableDefault: String,
+                      overrideIfExists: Boolean): Unit = synchronized {
+    val tempVariable = VariableIdentifier(Seq(SYSTEM_CATALOG, SESSION_DATABASE, name))
+    if (variables.contains(tempVariable) && !overrideIfExists) {
+      throw new AnalysisException(errorClass = "VARIABLE_ALREADY_EXISTS",
+        messageParameters = Map("variableName"
+          -> quoteNameParts(UnresolvedAttribute.parseAttributeName(name))))
+    }
+    val structField = StructField(name, initialValue.dataType)
+      .withCurrentDefaultValue(variableDefault)
+    variables.put(tempVariable, (initialValue, structField))
+  }
+
+  /**
+   * Generate a [[Variable]] operator from the temporary variable stored.
+   */
+  def getVariable(variable: VariableIdentifier): Option[(Literal, StructField)] = synchronized {
+    val qualifiedVariable = VariableIdentifier(variable.variableName,
+      Some(variable.database.getOrElse(SESSION_DATABASE)),
+      Some(variable.catalog.getOrElse(SYSTEM_CATALOG)))
+    val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+    val referredTempVariableNames = AnalysisContext.get.referredTempVariableNames
+    if (isResolvingView) {
+      // When resolving a view, only return a temp variable if it's referred by this view.
+      if (referredTempVariableNames.contains(variable.variableName)) {
+        variables.get(qualifiedVariable)
+      } else {
+        None
+      }
+    } else {
+      val result = variables.get(qualifiedVariable)
+      if (result.isDefined) {
+        // We are not resolving a view and the function is a temp one, add it to
+        // `AnalysisContext`, so during the view creation, we can save all referred temp
+        // variables to view metadata.
+        AnalysisContext.get.referredTempVariableNames.add(variable.variableName)
+      }
+      result
+    }
+  }
+
+  /**
+   * Drop a temporary variable.
+   *4

Review Comment:
   Or passing secret notes.. who's to tell



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1282159625


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3429,7 +3491,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
    * Replaces [[UnresolvedDeserializer]] with the deserialization expression that has been resolved
    * to the given input attributes.
    */
-  object ResolveDeserializer extends Rule[LogicalPlan] {
+  class ResolveDeserializer(catalog: SessionCatalog) extends Rule[LogicalPlan] {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1275782745


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -165,7 +165,10 @@ statement
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         identifierReference AS className=stringLit
         (USING resource (COMMA resource)*)?                            #createFunction
-    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference      #dropFunction
+    | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference        #dropFunction
+    | DECLARE (OR REPLACE)? VARIABLE?

Review Comment:
   is this more future-proof to require `VARIABLE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40474: [SPARK-42849] [SQL] Session Variables

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1275785833


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1459,7 +1462,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
    *       previous options are permanently not applicable. If the current option can be applicable
    *       in the next iteration (other rules update the plan), we should not try the next option.
    */
-  object ResolveReferences extends Rule[LogicalPlan] with ColumnResolutionHelper {
+  class ResolveReferences(catalog: SessionCatalog)
+    extends Rule[LogicalPlan] with ColumnResolutionHelper {
+
+    def sessionCatalog: SessionCatalog = catalog

Review Comment:
   do we need this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org