You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/02/23 09:14:43 UTC

[kyuubi] branch branch-1.7 updated (b59ecb468 -> 0886ce1a7)

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a change to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


    omit b59ecb468 [KYUUBI #4402] [ARROW] Make arrow-based query metrics trackable in SQL UI
    omit 396db8747 [KYUUBI #3957][FOLLOWUP][1.7] Rename the config prefix from kyuubi.server.batch.limit to kyuubi.server.limit.batch
     new 0886ce1a7 [KYUUBI #4402] [ARROW] Make arrow-based query metrics trackable in SQL UI

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (b59ecb468)
            \
             N -- N -- N   refs/heads/branch-1.7 (0886ce1a7)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/deployment/settings.md                                         | 6 +++---
 .../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala        | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)


[kyuubi] 01/01: [KYUUBI #4402] [ARROW] Make arrow-based query metrics trackable in SQL UI

Posted by fe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git

commit 0886ce1a70c5f2d9075fa9751754a0c879a27482
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Thu Feb 23 17:04:29 2023 +0800

    [KYUUBI #4402] [ARROW] Make arrow-based query metrics trackable in SQL UI
    
    ### _Why are the changes needed?_
    
    Currently, the SQL metrics are missing from the SQL UI tab, this is because we mistakenly bound QueryExecution in [PR-4392](https://github.com/apache/kyuubi/pull/4392), before this PR, it was `resultDF.queryExecution` that was bound to `SQLExecution.withNewExecutionId()`, But the executed Dataset is `resultDF.select(cols: _*)`, this PR passed the correct QueryExecution `resultDF.select(cols: _*).queryExecution` to solve this problem.
    
    ```sql
    set kyuubi.operation.result.format=arrow;
    select 1;
    ```
    
    Before this PR:
    
    ![截屏2023-02-23 下午1 47 34](https://user-images.githubusercontent.com/8537877/220832155-4277ccf7-1cfe-40db-a6e5-e1ed4a6d2e29.png)
    
    After this PR:
    
    ![截屏2023-02-23 下午2 07 23](https://user-images.githubusercontent.com/8537877/220832184-b9871b4b-f408-42ac-91ca-30e5cd503b24.png)
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4402 from cfmcgrady/arrow-metrics.
    
    Closes #4402
    
    e0cde3b1 [Fu Chen] fix style
    b35cbfdc [Fu Chen] fix
    542414ef [Fu Chen] make arrow-based query metrics trackable in SQL UI
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 3016f431bf0d4b881d8a1979aa13b23432a58468)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/spark/operation/ExecuteStatement.scala  | 79 ++++++++++++----------
 .../operation/SparkArrowbasedOperationSuite.scala  | 51 ++++++++++++--
 2 files changed, 88 insertions(+), 42 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 6ebcce377..fa517a8b1 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.RejectedExecutionException
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.kyuubi.SparkDatasetHelper
@@ -75,29 +76,6 @@ class ExecuteStatement(
     resultDF.take(maxRows)
   }
 
-  protected def collectAsIterator(resultDF: DataFrame): FetchIterator[_] = {
-    val resultMaxRows = spark.conf.getOption(OPERATION_RESULT_MAX_ROWS.key).map(_.toInt)
-      .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_MAX_ROWS))
-    if (incrementalCollect) {
-      if (resultMaxRows > 0) {
-        warn(s"Ignore ${OPERATION_RESULT_MAX_ROWS.key} on incremental collect mode.")
-      }
-      info("Execute in incremental collect mode")
-      new IterableFetchIterator[Any](new Iterable[Any] {
-        override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
-      })
-    } else {
-      val internalArray = if (resultMaxRows <= 0) {
-        info("Execute in full collect mode")
-        fullCollectResult(resultDF)
-      } else {
-        info(s"Execute with max result rows[$resultMaxRows]")
-        takeResult(resultDF, resultMaxRows)
-      }
-      new ArrayFetchIterator(internalArray)
-    }
-  }
-
   protected def executeStatement(): Unit = withLocalProperties {
     try {
       setState(OperationState.RUNNING)
@@ -163,14 +141,33 @@ class ExecuteStatement(
     }
   }
 
-  def convertComplexType(df: DataFrame): DataFrame = {
-    SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df, timestampAsString)
-  }
-
   override def getResultSetMetadataHints(): Seq[String] =
     Seq(
       s"__kyuubi_operation_result_format__=$resultFormat",
       s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString")
+
+  private def collectAsIterator(resultDF: DataFrame): FetchIterator[_] = {
+    val resultMaxRows = spark.conf.getOption(OPERATION_RESULT_MAX_ROWS.key).map(_.toInt)
+      .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_MAX_ROWS))
+    if (incrementalCollect) {
+      if (resultMaxRows > 0) {
+        warn(s"Ignore ${OPERATION_RESULT_MAX_ROWS.key} on incremental collect mode.")
+      }
+      info("Execute in incremental collect mode")
+      new IterableFetchIterator[Any](new Iterable[Any] {
+        override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
+      })
+    } else {
+      val internalArray = if (resultMaxRows <= 0) {
+        info("Execute in full collect mode")
+        fullCollectResult(resultDF)
+      } else {
+        info(s"Execute with max result rows[$resultMaxRows]")
+        takeResult(resultDF, resultMaxRows)
+      }
+      new ArrayFetchIterator(internalArray)
+    }
+  }
 }
 
 class ArrowBasedExecuteStatement(
@@ -182,30 +179,42 @@ class ArrowBasedExecuteStatement(
   extends ExecuteStatement(session, statement, shouldRunAsync, queryTimeout, incrementalCollect) {
 
   override protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = {
-    SparkDatasetHelper.toArrowBatchRdd(convertComplexType(resultDF)).toLocalIterator
+    collectAsArrow(convertComplexType(resultDF)) { rdd =>
+      rdd.toLocalIterator
+    }
   }
 
   override protected def fullCollectResult(resultDF: DataFrame): Array[_] = {
-    SparkDatasetHelper.toArrowBatchRdd(convertComplexType(resultDF)).collect()
+    collectAsArrow(convertComplexType(resultDF)) { rdd =>
+      rdd.collect()
+    }
   }
 
   override protected def takeResult(resultDF: DataFrame, maxRows: Int): Array[_] = {
     // this will introduce shuffle and hurt performance
     val limitedResult = resultDF.limit(maxRows)
-    SparkDatasetHelper.toArrowBatchRdd(convertComplexType(limitedResult)).collect()
+    collectAsArrow(convertComplexType(limitedResult)) { rdd =>
+      rdd.collect()
+    }
   }
 
   /**
-   * assign a new execution id for arrow-based operation.
+   * refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based
+   * operation, so that we can track the arrow-based queries on the UI tab.
    */
-  override protected def collectAsIterator(resultDF: DataFrame): FetchIterator[_] = {
-    SQLExecution.withNewExecutionId(resultDF.queryExecution, Some("collectAsArrow")) {
-      resultDF.queryExecution.executedPlan.resetMetrics()
-      super.collectAsIterator(resultDF)
+  private def collectAsArrow[T](df: DataFrame)(action: RDD[Array[Byte]] => T): T = {
+    SQLExecution.withNewExecutionId(df.queryExecution, Some("collectAsArrow")) {
+      df.queryExecution.executedPlan.resetMetrics()
+      action(SparkDatasetHelper.toArrowBatchRdd(df))
     }
   }
 
   override protected def isArrowBasedOperation: Boolean = true
 
   override val resultFormat = "arrow"
+
+  private def convertComplexType(df: DataFrame): DataFrame = {
+    SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df, timestampAsString)
+  }
+
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index 30cdeca5a..ae6237bb5 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -103,22 +103,41 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     withJdbcStatement() { statement =>
       // since all the new sessions have their owner listener bus, we should register the listener
       // in the current session.
-      SparkSQLEngine.currentEngine.get
-        .backendService
-        .sessionManager
-        .allSessions()
-        .foreach(_.asInstanceOf[SparkSessionImpl].spark.listenerManager.register(listener))
+      registerListener(listener)
 
       val result = statement.executeQuery("select 1 as c1")
       assert(result.next())
       assert(result.getInt("c1") == 1)
     }
-
     KyuubiSparkContextHelper.waitListenerBus(spark)
-    spark.listenerManager.unregister(listener)
+    unregisterListener(listener)
     assert(plan.isInstanceOf[Project])
   }
 
+  test("arrow-based query metrics") {
+    var queryExecution: QueryExecution = null
+
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+        queryExecution = qe
+      }
+      override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
+    }
+    withJdbcStatement() { statement =>
+      registerListener(listener)
+      val result = statement.executeQuery("select 1 as c1")
+      assert(result.next())
+      assert(result.getInt("c1") == 1)
+    }
+
+    KyuubiSparkContextHelper.waitListenerBus(spark)
+    unregisterListener(listener)
+
+    val metrics = queryExecution.executedPlan.collectLeaves().head.metrics
+    assert(metrics.contains("numOutputRows"))
+    assert(metrics("numOutputRows").value === 1)
+  }
+
   private def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
     val query =
       s"""
@@ -140,4 +159,22 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     assert(resultSet.next())
     assert(resultSet.getString("col") === expect)
   }
+
+  private def registerListener(listener: QueryExecutionListener): Unit = {
+    // since all the new sessions have their owner listener bus, we should register the listener
+    // in the current session.
+    SparkSQLEngine.currentEngine.get
+      .backendService
+      .sessionManager
+      .allSessions()
+      .foreach(_.asInstanceOf[SparkSessionImpl].spark.listenerManager.register(listener))
+  }
+
+  private def unregisterListener(listener: QueryExecutionListener): Unit = {
+    SparkSQLEngine.currentEngine.get
+      .backendService
+      .sessionManager
+      .allSessions()
+      .foreach(_.asInstanceOf[SparkSessionImpl].spark.listenerManager.unregister(listener))
+  }
 }