You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/09/05 07:01:42 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3376] PlanOnly supports output in different styles

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new cc60a1536 [KYUUBI #3376] PlanOnly supports output in different styles
cc60a1536 is described below

commit cc60a15363cb698637a55e3b61324ca88a623063
Author: yikf <yi...@gmail.com>
AuthorDate: Mon Sep 5 15:01:31 2022 +0800

    [KYUUBI #3376] PlanOnly supports output in different styles
    
    ### _Why are the changes needed?_
    
    Fix https://github.com/apache/incubator-kyuubi/issues/3376
    
    This pr aims support planOnly output in different styles (Users can perform SQL scheduling based on the JSON description of TreeNode)
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3377 from Yikf/planOnly-support-json.
    
    Closes #3376
    
    b63ae718 [yikf] Support json_description for PlanOnlyStatement
    
    Authored-by: yikf <yi...@gmail.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 docs/deployment/settings.md                        |   1 +
 .../engine/spark/operation/PlanOnlyStatement.scala | 106 +++++++++++++++------
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  28 ++++++
 .../kyuubi/operation/PlanOnlyOperationSuite.scala  |  64 +++++++++++++
 4 files changed, 168 insertions(+), 31 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index cee7e7204..5a0b6ea23 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -407,6 +407,7 @@ kyuubi.operation.language|SQL|Choose a programing language for the following inp
 kyuubi.operation.log.dir.root|server_operation_logs|Root directory for query operation log at server-side.|string|1.4.0
 kyuubi.operation.plan.only.excludes|ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace|Comma-separated list of query plan names, in the form of simple class names, i.e, for `set abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` e.t.c, which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take  [...]
 kyuubi.operation.plan.only.mode|NONE|Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support pl [...]
+kyuubi.operation.plan.only.output.style|PLAIN|Configures the planOnly output style, The value can be 'plain' and 'json', default value is 'plain', this configuration supports only the output styles of the Spark engine|string|1.7.0
 kyuubi.operation.progress.enabled|false|Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`.|boolean|1.6.0
 kyuubi.operation.query.timeout|&lt;undefined&gt;|Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.ope [...]
 kyuubi.operation.result.max.rows|0|Max rows of Spark query results. Rows that exceeds the limit would be ignored. By setting this value to 0 to disable the max rows limit.|int|1.6.0
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index ec36e5204..24151e7c0 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -18,12 +18,14 @@
 package org.apache.kyuubi.engine.spark.operation
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.OPERATION_PLAN_ONLY_EXCLUDES
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE, PlanOnlyStyles}
 import org.apache.kyuubi.config.KyuubiConf.OperationModes._
+import org.apache.kyuubi.config.KyuubiConf.PlanOnlyStyles.{JSON, PLAIN}
 import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
@@ -42,6 +44,12 @@ class PlanOnlyStatement(
     spark.conf.getOption(OPERATION_PLAN_ONLY_EXCLUDES.key).map(_.split(",").map(_.trim).toSeq)
       .getOrElse(session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_EXCLUDES))
   }
+
+  private val style = PlanOnlyStyles(spark.conf.get(
+    OPERATION_PLAN_ONLY_OUT_STYLE.key,
+    session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_OUT_STYLE)))
+  spark.conf.set(OPERATION_PLAN_ONLY_OUT_STYLE.key, style.toString)
+
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
   override protected def resultSchema: StructType = {
@@ -60,36 +68,11 @@ class PlanOnlyStatement(
           case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
             result = spark.sql(statement)
             iter = new ArrayFetchIterator(result.collect())
-          case plan => mode match {
-              case PARSE =>
-                iter = new IterableFetchIterator(Seq(Row(plan.toString())))
-              case ANALYZE =>
-                val analyzed = spark.sessionState.analyzer.execute(plan)
-                spark.sessionState.analyzer.checkAnalysis(analyzed)
-                iter = new IterableFetchIterator(Seq(Row(analyzed.toString())))
-              case OPTIMIZE =>
-                val analyzed = spark.sessionState.analyzer.execute(plan)
-                spark.sessionState.analyzer.checkAnalysis(analyzed)
-                val optimized = spark.sessionState.optimizer.execute(analyzed)
-                iter = new IterableFetchIterator(Seq(Row(optimized.toString())))
-              case OPTIMIZE_WITH_STATS =>
-                val analyzed = spark.sessionState.analyzer.execute(plan)
-                spark.sessionState.analyzer.checkAnalysis(analyzed)
-                val optimized = spark.sessionState.optimizer.execute(analyzed)
-                optimized.stats
-                iter = new IterableFetchIterator(Seq(Row(optimized.treeString(
-                  verbose = true,
-                  addSuffix = true,
-                  SQLConf.get.maxToStringFields,
-                  printOperatorId = false))))
-              case PHYSICAL =>
-                val physical = spark.sql(statement).queryExecution.sparkPlan
-                iter = new IterableFetchIterator(Seq(Row(physical.toString())))
-              case EXECUTION =>
-                val executed = spark.sql(statement).queryExecution.executedPlan
-                iter = new IterableFetchIterator(Seq(Row(executed.toString())))
-              case _ =>
-                throw KyuubiSQLException(s"The operation mode $mode" +
+
+          case plan => style match {
+              case PLAIN => explainWithPlainStyle(plan)
+              case JSON => explainWithJsonStyle(plan)
+              case _ => throw KyuubiSQLException(s"The plan only style $style" +
                   " doesn't support in Spark SQL engine.")
             }
         }
@@ -98,4 +81,65 @@ class PlanOnlyStatement(
       onError()
     }
   }
+
+  private def explainWithPlainStyle(plan: LogicalPlan): Unit = {
+    mode match {
+      case PARSE =>
+        iter = new IterableFetchIterator(Seq(Row(plan.toString())))
+      case ANALYZE =>
+        val analyzed = spark.sessionState.analyzer.execute(plan)
+        spark.sessionState.analyzer.checkAnalysis(analyzed)
+        iter = new IterableFetchIterator(Seq(Row(analyzed.toString())))
+      case OPTIMIZE =>
+        val analyzed = spark.sessionState.analyzer.execute(plan)
+        spark.sessionState.analyzer.checkAnalysis(analyzed)
+        val optimized = spark.sessionState.optimizer.execute(analyzed)
+        iter = new IterableFetchIterator(Seq(Row(optimized.toString())))
+      case OPTIMIZE_WITH_STATS =>
+        val analyzed = spark.sessionState.analyzer.execute(plan)
+        spark.sessionState.analyzer.checkAnalysis(analyzed)
+        val optimized = spark.sessionState.optimizer.execute(analyzed)
+        optimized.stats
+        iter = new IterableFetchIterator(Seq(Row(optimized.treeString(
+          verbose = true,
+          addSuffix = true,
+          SQLConf.get.maxToStringFields,
+          printOperatorId = false))))
+      case PHYSICAL =>
+        val physical = spark.sql(statement).queryExecution.sparkPlan
+        iter = new IterableFetchIterator(Seq(Row(physical.toString())))
+      case EXECUTION =>
+        val executed = spark.sql(statement).queryExecution.executedPlan
+        iter = new IterableFetchIterator(Seq(Row(executed.toString())))
+      case _ =>
+        throw KyuubiSQLException(s"The operation mode $mode" +
+          " doesn't support in Spark SQL engine.")
+    }
+  }
+
+  private def explainWithJsonStyle(plan: LogicalPlan): Unit = {
+    mode match {
+      case PARSE =>
+        iter = new IterableFetchIterator(Seq(Row(plan.toJSON)))
+      case ANALYZE =>
+        val analyzed = spark.sessionState.analyzer.execute(plan)
+        spark.sessionState.analyzer.checkAnalysis(analyzed)
+        iter = new IterableFetchIterator(Seq(Row(analyzed.toJSON)))
+      case OPTIMIZE | OPTIMIZE_WITH_STATS =>
+        val analyzed = spark.sessionState.analyzer.execute(plan)
+        spark.sessionState.analyzer.checkAnalysis(analyzed)
+        val optimized = spark.sessionState.optimizer.execute(analyzed)
+        iter = new IterableFetchIterator(Seq(Row(optimized.toJSON)))
+      case PHYSICAL =>
+        val physical = spark.sql(statement).queryExecution.sparkPlan
+        iter = new IterableFetchIterator(Seq(Row(physical.toJSON)))
+      case EXECUTION =>
+        val executed = spark.sql(statement).queryExecution.executedPlan
+        iter = new IterableFetchIterator(Seq(Row(executed.toJSON)))
+      case _ =>
+        throw KyuubiSQLException(s"The operation mode $mode" +
+          " doesn't support in Spark SQL engine.")
+    }
+  }
+
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 0a7e1f35f..c249b8be8 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1675,6 +1675,34 @@ object KyuubiConf {
           "and 'none'.")
       .createWithDefault(OperationModes.NONE.toString)
 
+  object PlanOnlyStyles extends Enumeration with Logging {
+    type PlanOnlyStyle = Value
+    val PLAIN, JSON, UNKNOWN = Value
+
+    def apply(mode: String): PlanOnlyStyle = {
+      mode.toUpperCase(Locale.ROOT) match {
+        case "PLAIN" => PLAIN
+        case "JSON" => JSON
+        case other =>
+          warn(s"Unsupported plan only style: $mode, using UNKNOWN instead")
+          UNKNOWN
+      }
+    }
+  }
+
+  val OPERATION_PLAN_ONLY_OUT_STYLE: ConfigEntry[String] =
+    buildConf("kyuubi.operation.plan.only.output.style")
+      .doc("Configures the planOnly output style, The value can be 'plain' and 'json', default " +
+        "value is 'plain', this configuration supports only the output styles of the Spark engine")
+      .version("1.7.0")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValue(
+        mode => Set("PLAIN", "JSON").contains(mode),
+        "Invalid value for 'kyuubi.operation.plan.only.output.style'. Valid values are " +
+          "'plain', 'json'.")
+      .createWithDefault(PlanOnlyStyles.PLAIN.toString)
+
   val OPERATION_PLAN_ONLY_EXCLUDES: ConfigEntry[Seq[String]] =
     buildConf("kyuubi.operation.plan.only.excludes")
       .doc("Comma-separated list of query plan names, in the form of simple class names, i.e, " +
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index ab2793655..07055efc8 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -22,6 +22,7 @@ import java.sql.{SQLException, Statement}
 import org.apache.kyuubi.WithKyuubiServer
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.OperationModes._
+import org.apache.kyuubi.config.KyuubiConf.PlanOnlyStyles.{JSON, PLAIN}
 
 class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
 
@@ -128,6 +129,69 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
     }
   }
 
+  test("KYUUBI #3376 : Spark physical Plan outputs in plain style") {
+    withSessionConf()(Map(
+      KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> PHYSICAL.toString,
+      KyuubiConf.OPERATION_PLAN_ONLY_OUT_STYLE.key -> PLAIN.toString))(
+      Map.empty) {
+      withJdbcStatement() { statement =>
+        val resultSet = statement.executeQuery(
+          "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")
+        assert(resultSet.next())
+        val operationPlan = resultSet.getString(1)
+        assert(operationPlan.startsWith("Exchange hashpartitioning"))
+      }
+    }
+  }
+
+  test("KYUUBI #3376 : Spark physical Plan outputs in json style") {
+    withSessionConf()(Map(
+      KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> PHYSICAL.toString,
+      KyuubiConf.OPERATION_PLAN_ONLY_OUT_STYLE.key -> JSON.toString))(
+      Map.empty) {
+      withJdbcStatement() { statement =>
+        val resultSet = statement.executeQuery(
+          "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")
+        assert(resultSet.next())
+        val operationPlan = resultSet.getString(1)
+        assert(operationPlan.contains(
+          "\"class\":\"org.apache.spark.sql.execution.exchange.ShuffleExchangeExec\""))
+      }
+    }
+  }
+
+  test("KYUUBI #3376 : Spark optimized Plan outputs in json style") {
+    withSessionConf()(Map(
+      KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> OPTIMIZE.toString,
+      KyuubiConf.OPERATION_PLAN_ONLY_OUT_STYLE.key -> JSON.toString))(
+      Map.empty) {
+      withJdbcStatement() { statement =>
+        val resultSet = statement.executeQuery(
+          "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")
+        assert(resultSet.next())
+        val operationPlan = resultSet.getString(1)
+        assert(operationPlan.contains(
+          "\"class\":\"org.apache.spark.sql.catalyst.plans.logical.LocalRelation\""))
+      }
+    }
+  }
+
+  test("KYUUBI #3376 : Spark parse Plan outputs in json style") {
+    withSessionConf()(Map(
+      KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> PARSE.toString,
+      KyuubiConf.OPERATION_PLAN_ONLY_OUT_STYLE.key -> JSON.toString))(
+      Map.empty) {
+      withJdbcStatement() { statement =>
+        val resultSet = statement.executeQuery(
+          "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")
+        assert(resultSet.next())
+        val operationPlan = resultSet.getString(1)
+        assert(operationPlan.contains(
+          "\"class\":\"org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute\""))
+      }
+    }
+  }
+
   test("kyuubi #3214: Plan only mode with an incorrect value") {
     withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> "parse"))(Map.empty) {
       withJdbcStatement() { statement =>