You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2023/04/26 09:59:38 UTC

[kyuubi] branch master updated: [KYUUBI #4710][ARROW][FOLLOWUP] Post driver-side metrics for LocalTableScanExec/CommandResultExec

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7012aa20 [KYUUBI #4710][ARROW][FOLLOWUP] Post driver-side metrics for LocalTableScanExec/CommandResultExec
b7012aa20 is described below

commit b7012aa206eb836073987cb6c8aa59ec4421c174
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Wed Apr 26 17:59:24 2023 +0800

    [KYUUBI #4710][ARROW][FOLLOWUP] Post driver-side metrics for LocalTableScanExec/CommandResultExec
    
    ### _Why are the changes needed?_
    
    to resolve https://github.com/apache/kyuubi/pull/4710#discussion_r1168600486
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4769 from cfmcgrady/arrow-send-driver-metrics.
    
    Closes #4710
    
    a952d088b [Fu Chen] refactor
    a5645de90 [Fu Chen] address comment
    6749630ee [Fu Chen] update
    2dff41eeb [Fu Chen] add SparkMetricsTestUtils
    8c772bca7 [Fu Chen] ut
    4e3cd7d11 [Fu Chen] metrics
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: ulyssesyou <ul...@apache.org>
---
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 37 ++++++++-----
 .../operation/SparkArrowbasedOperationSuite.scala  | 60 ++++++++++++++++++++--
 .../execution/metric/SparkMetricsTestUtils.scala   | 53 +++++++++++++++++++
 3 files changed, 134 insertions(+), 16 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 285a28a60..8f8aef560 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.kyuubi
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.rdd.RDD
@@ -28,6 +29,7 @@ import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, Spa
 import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
@@ -184,26 +186,36 @@ object SparkDatasetHelper extends Logging {
     result.toArray
   }
 
-  def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
+  private lazy val commandResultExecRowsMethod = DynMethods.builder("rows")
+    .impl("org.apache.spark.sql.execution.CommandResultExec")
+    .build()
+
+  private def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
+    val spark = SparkSession.active
+    // TODO: replace with `command.rows` once we drop Spark 3.1 support.
+    val rows = commandResultExecRowsMethod.invoke[Seq[InternalRow]](command)
+    command.longMetric("numOutputRows").add(rows.size)
+    sendDriverMetrics(spark.sparkContext, command.metrics)
     KyuubiArrowConverters.toBatchIterator(
-      // TODO: replace with `command.rows.iterator` once we drop Spark 3.1 support.
-      commandResultExecRowsMethod.invoke[Seq[InternalRow]](command).iterator,
+      rows.iterator,
       command.schema,
-      SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch,
+      spark.sessionState.conf.arrowMaxRecordsPerBatch,
       maxBatchSize,
       -1,
-      SparkSession.active.sessionState.conf.sessionLocalTimeZone).toArray
+      spark.sessionState.conf.sessionLocalTimeZone).toArray
   }
 
-  def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = {
+  private def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = {
+    val spark = SparkSession.active
     localTableScan.longMetric("numOutputRows").add(localTableScan.rows.size)
+    sendDriverMetrics(spark.sparkContext, localTableScan.metrics)
     KyuubiArrowConverters.toBatchIterator(
       localTableScan.rows.iterator,
       localTableScan.schema,
-      SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch,
+      spark.sessionState.conf.arrowMaxRecordsPerBatch,
       maxBatchSize,
       -1,
-      SparkSession.active.sessionState.conf.sessionLocalTimeZone).toArray
+      spark.sessionState.conf.sessionLocalTimeZone).toArray
   }
 
   /**
@@ -268,10 +280,6 @@ object SparkDatasetHelper extends Logging {
     sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec"
   }
 
-  private lazy val commandResultExecRowsMethod = DynMethods.builder("rows")
-    .impl("org.apache.spark.sql.execution.CommandResultExec")
-    .build()
-
   /**
    * refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based
    * operation, so that we can track the arrow-based queries on the UI tab.
@@ -282,4 +290,9 @@ object SparkDatasetHelper extends Logging {
       body
     }
   }
+
+  private def sendDriverMetrics(sc: SparkContext, metrics: Map[String, SQLMetric]): Unit = {
+    val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
+  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index 057d8c6ff..9273ab879 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.engine.spark.operation
 
 import java.sql.Statement
-import java.util.{Set => JSet}
+import java.util.{Locale, Set => JSet}
 
 import org.apache.spark.KyuubiSparkContextHelper
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.execution.metric.SparkMetricsTestUtils
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.kyuubi.SparkDatasetHelper
@@ -41,7 +42,8 @@ import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
 import org.apache.kyuubi.operation.SparkDataTypeTests
 import org.apache.kyuubi.reflection.DynFields
 
-class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests {
+class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests
+  with SparkMetricsTestUtils {
 
   override protected def jdbcUrl: String = getJdbcUrl
 
@@ -58,6 +60,16 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     withJdbcStatement() { statement =>
       checkResultSetFormat(statement, "arrow")
     }
+    spark.catalog.listTables()
+      .collect()
+      .foreach { table =>
+        if (table.isTemporary) {
+          spark.catalog.dropTempView(table.name)
+        } else {
+          spark.sql(s"DROP TABLE IF EXISTS ${table.name}")
+        }
+        ()
+      }
   }
 
   test("detect resultSet format") {
@@ -288,13 +300,12 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
       assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec")
     }
     withJdbcStatement("table_1") { statement =>
-      statement.executeQuery(s"CREATE TABLE table_1 (id bigint) USING parquet")
+      statement.executeQuery("CREATE TABLE table_1 (id bigint) USING parquet")
       withSparkListener(listener) {
         withSparkListener(l2) {
           val resultSet = statement.executeQuery("SHOW TABLES")
           assert(resultSet.next())
           assert(resultSet.getString("tableName") == "table_1")
-          KyuubiSparkContextHelper.waitListenerBus(spark)
         }
       }
     }
@@ -348,6 +359,33 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     assert(metrics("numOutputRows").value === 1)
   }
 
+  test("post LocalTableScanExec driver-side metrics") {
+    val expectedMetrics = Map(
+      0L -> (("LocalTableScan", Map("number of output rows" -> "2"))))
+    withTables("view_1") {
+      val s = spark
+      import s.implicits._
+      Seq((1, "a"), (2, "b")).toDF("c1", "c2").createOrReplaceTempView("view_1")
+      val df = spark.sql("SELECT * FROM view_1")
+      val metrics = getSparkPlanMetrics(df)
+      assert(metrics == expectedMetrics)
+    }
+  }
+
+  test("post CommandResultExec driver-side metrics") {
+    spark.sql("show tables").show(truncate = false)
+    assume(SPARK_ENGINE_RUNTIME_VERSION >= "3.2")
+    val expectedMetrics = Map(
+      0L -> (("CommandResult", Map("number of output rows" -> "2"))))
+    withTables("table_1", "table_2") {
+      spark.sql("CREATE TABLE table_1 (id bigint) USING parquet")
+      spark.sql("CREATE TABLE table_2 (id bigint) USING parquet")
+      val df = spark.sql("SHOW TABLES")
+      val metrics = getSparkPlanMetrics(df)
+      assert(metrics == expectedMetrics)
+    }
+  }
+
   private def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
     val query =
       s"""
@@ -465,6 +503,20 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     }
   }
 
+  private def withTables[T](tableNames: String*)(f: => T): T = {
+    try {
+      f
+    } finally {
+      tableNames.foreach { name =>
+        if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
+          spark.sql(s"DROP VIEW IF EXISTS $name")
+        } else {
+          spark.sql(s"DROP TABLE IF EXISTS $name")
+        }
+      }
+    }
+  }
+
   /**
    * This method provides a reflection-based implementation of [[SQLConf.isStaticConfigKey]] to
    * adapt Spark-3.1.x
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/execution/metric/SparkMetricsTestUtils.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/execution/metric/SparkMetricsTestUtils.scala
new file mode 100644
index 000000000..7ab06f0ef
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/execution/metric/SparkMetricsTestUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.kyuubi.SparkDatasetHelper
+
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+
+trait SparkMetricsTestUtils {
+  this: WithSparkSQLEngine =>
+
+  private lazy val statusStore = spark.sharedState.statusStore
+  private def currentExecutionIds(): Set[Long] = {
+    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+    statusStore.executionsList.map(_.executionId).toSet
+  }
+
+  protected def getSparkPlanMetrics(df: DataFrame): Map[Long, (String, Map[String, Any])] = {
+    val previousExecutionIds = currentExecutionIds()
+    SparkDatasetHelper.executeCollect(df)
+    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+    val executionIds = currentExecutionIds().diff(previousExecutionIds)
+    assert(executionIds.size === 1)
+    val executionId = executionIds.head
+    val metricValues = statusStore.executionMetrics(executionId)
+    SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)).allNodes
+      .map { node =>
+        val nodeMetrics = node.metrics.map { metric =>
+          val metricValue = metricValues(metric.accumulatorId)
+          (metric.name, metricValue)
+        }.toMap
+        (node.id, node.name -> nodeMetrics)
+      }.toMap
+  }
+}