You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jaceklaskowski (via GitHub)" <gi...@apache.org> on 2023/03/27 10:58:55 UTC

[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40474: [SPARK-42849] [WIP] [SQL] Session Variables

jaceklaskowski commented on code in PR #40474:
URL: https://github.com/apache/spark/pull/40474#discussion_r1149130887


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -582,9 +583,29 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
-    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val expression = ctx.expression().toString
-    SetVariableCommand(VariableIdentifier(variableName.last), expression)
+    if (ctx.assignmentList().isEmpty) {
+      SetVariableCommand(Seq(VariableIdentifier("var")), "7")

Review Comment:
   7?! What does this "magic number" represent?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -582,9 +583,29 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
-    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val expression = ctx.expression().toString
-    SetVariableCommand(VariableIdentifier(variableName.last), expression)
+    if (ctx.assignmentList().isEmpty) {
+      SetVariableCommand(Seq(VariableIdentifier("var")), "7")
+    } else {
+      val assignCtx = ctx.assignmentList()
+      val varList = assignCtx.assignment().asScala.map {
+        assign => {

Review Comment:
   nit: no need for `{`. I'd also move `assign` to the line above (as more Scala-idiomatic IMHO)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>

Review Comment:
   nit: `varInfos`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -582,9 +583,29 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   override def visitSetVariable(ctx: SetVariableContext): LogicalPlan = withOrigin(ctx) {
-    val variableName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val expression = ctx.expression().toString
-    SetVariableCommand(VariableIdentifier(variableName.last), expression)
+    if (ctx.assignmentList().isEmpty) {
+      SetVariableCommand(Seq(VariableIdentifier("var")), "7")
+    } else {
+      val assignCtx = ctx.assignmentList()
+      val varList = assignCtx.assignment().asScala.map {
+        assign => {
+          val varname = visitMultipartIdentifier(assign.key)
+          VariableIdentifier(varname.head)
+        }
+      }
+
+      val assignments = assignCtx.assignment()
+      val exprStrList = assignments.asScala.map {
+        assign => {

Review Comment:
   nit: no need for `{`. I'd also move assign to the line above (as more Scala-idiomatic IMHO)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {

Review Comment:
   nit: `map` (not `collect`)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {
+      case variable => Alias(Cast(UnresolvedAttribute(variable._1),

Review Comment:
   Please use destructuring on assignment to avoid these mysterious `_1` and `_2` 🙏 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {

Review Comment:
   ```
   val (_, name) = varInfo.getOrElse { throw ... }
   (identifier.variableName, name)
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {
+      case variable => Alias(Cast(UnresolvedAttribute(variable._1),
+        variable._2.dataType), variable._1)() }
+
+    val parsed = parser.parseQuery(newQueryStr)
+    val subQueryAlias = SubqueryAlias("T", UnresolvedSubqueryColumnAliases(varNames, parsed))
+
+    val analyzed: LogicalPlan = analyzer.execute(Project(casts, subQueryAlias))
+    analyzer.checkAnalysis(analyzed)
+
+    val df = Dataset.ofRows(sparkSession, analyzed)
+    val array = df.collect()
+    if (array.length == 0) {
+    } else if (array.length > 1) {

Review Comment:
   What does this `else if` do? Is it needed?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }

Review Comment:
   ```
   varInfoList.map { case (name, _) => name }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],

Review Comment:
   nit: `identifiers` plural 🙏 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/variables.scala:
##########
@@ -119,13 +116,45 @@ case class DropVariableCommand(
 /**
  * This command is for setting a SQL variable.
  */
-case class SetVariableCommand(identifier: VariableIdentifier,
-                              newExpr: String) extends LeafRunnableCommand {
+case class SetVariableCommand(identifierList: Seq[VariableIdentifier],
+                              newQueryStr: String) extends LeafRunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val parser = sparkSession.sessionState.sqlParser
+    val analyzer = sparkSession.sessionState.analyzer
     val catalog = sparkSession.sessionState.catalog
-    assert(identifier.database.isEmpty)
-    val varInfo = catalog.getTempVariable(identifier.variableName)
+    val varInfoList = identifierList.collect { case identifier: VariableIdentifier =>
+      assert(identifier.database.isEmpty)
+      val varInfo = catalog.getTempVariable(identifier.variableName)
+      if (varInfo.isEmpty) {
+        throw new NoSuchTempVariableException(identifier.variableName)
+      }
+      (identifier.variableName, varInfo.get._2)
+    }
+
+    val varNames = varInfoList.collect { case variable => variable._1 }
+    val casts = varInfoList.collect {
+      case variable => Alias(Cast(UnresolvedAttribute(variable._1),
+        variable._2.dataType), variable._1)() }
+
+    val parsed = parser.parseQuery(newQueryStr)
+    val subQueryAlias = SubqueryAlias("T", UnresolvedSubqueryColumnAliases(varNames, parsed))
+
+    val analyzed: LogicalPlan = analyzer.execute(Project(casts, subQueryAlias))
+    analyzer.checkAnalysis(analyzed)
+
+    val df = Dataset.ofRows(sparkSession, analyzed)
+    val array = df.collect()
+    if (array.length == 0) {
+    } else if (array.length > 1) {
+    } else {
+      val row = array(0)
+      (row.toSeq zip varInfoList) map( row => {
+        catalog.createTempVariable(row._2._1, Literal(row._1, row._2._2.dataType),

Review Comment:
   Things got a bit wild here with `_2._1`-like accessors, don't they? 😉 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org