You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2023/04/26 09:59:38 UTC
[kyuubi] branch master updated: [KYUUBI #4710][ARROW][FOLLOWUP] Post driver-side metrics for LocalTableScanExec/CommandResultExec
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b7012aa20 [KYUUBI #4710][ARROW][FOLLOWUP] Post driver-side metrics for LocalTableScanExec/CommandResultExec
b7012aa20 is described below
commit b7012aa206eb836073987cb6c8aa59ec4421c174
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Wed Apr 26 17:59:24 2023 +0800
[KYUUBI #4710][ARROW][FOLLOWUP] Post driver-side metrics for LocalTableScanExec/CommandResultExec
### _Why are the changes needed?_
to resolve https://github.com/apache/kyuubi/pull/4710#discussion_r1168600486
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4769 from cfmcgrady/arrow-send-driver-metrics.
Closes #4710
a952d088b [Fu Chen] refactor
a5645de90 [Fu Chen] address comment
6749630ee [Fu Chen] update
2dff41eeb [Fu Chen] add SparkMetricsTestUtils
8c772bca7 [Fu Chen] ut
4e3cd7d11 [Fu Chen] metrics
Authored-by: Fu Chen <cf...@gmail.com>
Signed-off-by: ulyssesyou <ul...@apache.org>
---
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 37 ++++++++-----
.../operation/SparkArrowbasedOperationSuite.scala | 60 ++++++++++++++++++++--
.../execution/metric/SparkMetricsTestUtils.scala | 53 +++++++++++++++++++
3 files changed, 134 insertions(+), 16 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 285a28a60..8f8aef560 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.kyuubi
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
@@ -28,6 +29,7 @@ import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, Spa
import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
@@ -184,26 +186,36 @@ object SparkDatasetHelper extends Logging {
result.toArray
}
- def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
+ private lazy val commandResultExecRowsMethod = DynMethods.builder("rows")
+ .impl("org.apache.spark.sql.execution.CommandResultExec")
+ .build()
+
+ private def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
+ val spark = SparkSession.active
+ // TODO: replace with `command.rows` once we drop Spark 3.1 support.
+ val rows = commandResultExecRowsMethod.invoke[Seq[InternalRow]](command)
+ command.longMetric("numOutputRows").add(rows.size)
+ sendDriverMetrics(spark.sparkContext, command.metrics)
KyuubiArrowConverters.toBatchIterator(
- // TODO: replace with `command.rows.iterator` once we drop Spark 3.1 support.
- commandResultExecRowsMethod.invoke[Seq[InternalRow]](command).iterator,
+ rows.iterator,
command.schema,
- SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch,
+ spark.sessionState.conf.arrowMaxRecordsPerBatch,
maxBatchSize,
-1,
- SparkSession.active.sessionState.conf.sessionLocalTimeZone).toArray
+ spark.sessionState.conf.sessionLocalTimeZone).toArray
}
- def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = {
+ private def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = {
+ val spark = SparkSession.active
localTableScan.longMetric("numOutputRows").add(localTableScan.rows.size)
+ sendDriverMetrics(spark.sparkContext, localTableScan.metrics)
KyuubiArrowConverters.toBatchIterator(
localTableScan.rows.iterator,
localTableScan.schema,
- SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch,
+ spark.sessionState.conf.arrowMaxRecordsPerBatch,
maxBatchSize,
-1,
- SparkSession.active.sessionState.conf.sessionLocalTimeZone).toArray
+ spark.sessionState.conf.sessionLocalTimeZone).toArray
}
/**
@@ -268,10 +280,6 @@ object SparkDatasetHelper extends Logging {
sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec"
}
- private lazy val commandResultExecRowsMethod = DynMethods.builder("rows")
- .impl("org.apache.spark.sql.execution.CommandResultExec")
- .build()
-
/**
* refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based
* operation, so that we can track the arrow-based queries on the UI tab.
@@ -282,4 +290,9 @@ object SparkDatasetHelper extends Logging {
body
}
}
+
+ private def sendDriverMetrics(sc: SparkContext, metrics: Map[String, SQLMetric]): Unit = {
+ val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index 057d8c6ff..9273ab879 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.engine.spark.operation
import java.sql.Statement
-import java.util.{Set => JSet}
+import java.util.{Locale, Set => JSet}
import org.apache.spark.KyuubiSparkContextHelper
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.execution.metric.SparkMetricsTestUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kyuubi.SparkDatasetHelper
@@ -41,7 +42,8 @@ import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.SparkDataTypeTests
import org.apache.kyuubi.reflection.DynFields
-class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests {
+class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests
+ with SparkMetricsTestUtils {
override protected def jdbcUrl: String = getJdbcUrl
@@ -58,6 +60,16 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
withJdbcStatement() { statement =>
checkResultSetFormat(statement, "arrow")
}
+ spark.catalog.listTables()
+ .collect()
+ .foreach { table =>
+ if (table.isTemporary) {
+ spark.catalog.dropTempView(table.name)
+ } else {
+ spark.sql(s"DROP TABLE IF EXISTS ${table.name}")
+ }
+ ()
+ }
}
test("detect resultSet format") {
@@ -288,13 +300,12 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec")
}
withJdbcStatement("table_1") { statement =>
- statement.executeQuery(s"CREATE TABLE table_1 (id bigint) USING parquet")
+ statement.executeQuery("CREATE TABLE table_1 (id bigint) USING parquet")
withSparkListener(listener) {
withSparkListener(l2) {
val resultSet = statement.executeQuery("SHOW TABLES")
assert(resultSet.next())
assert(resultSet.getString("tableName") == "table_1")
- KyuubiSparkContextHelper.waitListenerBus(spark)
}
}
}
@@ -348,6 +359,33 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
assert(metrics("numOutputRows").value === 1)
}
+ test("post LocalTableScanExec driver-side metrics") {
+ val expectedMetrics = Map(
+ 0L -> (("LocalTableScan", Map("number of output rows" -> "2"))))
+ withTables("view_1") {
+ val s = spark
+ import s.implicits._
+ Seq((1, "a"), (2, "b")).toDF("c1", "c2").createOrReplaceTempView("view_1")
+ val df = spark.sql("SELECT * FROM view_1")
+ val metrics = getSparkPlanMetrics(df)
+ assert(metrics == expectedMetrics)
+ }
+ }
+
+ test("post CommandResultExec driver-side metrics") {
+ spark.sql("show tables").show(truncate = false)
+ assume(SPARK_ENGINE_RUNTIME_VERSION >= "3.2")
+ val expectedMetrics = Map(
+ 0L -> (("CommandResult", Map("number of output rows" -> "2"))))
+ withTables("table_1", "table_2") {
+ spark.sql("CREATE TABLE table_1 (id bigint) USING parquet")
+ spark.sql("CREATE TABLE table_2 (id bigint) USING parquet")
+ val df = spark.sql("SHOW TABLES")
+ val metrics = getSparkPlanMetrics(df)
+ assert(metrics == expectedMetrics)
+ }
+ }
+
private def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
val query =
s"""
@@ -465,6 +503,20 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
}
}
+ private def withTables[T](tableNames: String*)(f: => T): T = {
+ try {
+ f
+ } finally {
+ tableNames.foreach { name =>
+ if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
+ spark.sql(s"DROP VIEW IF EXISTS $name")
+ } else {
+ spark.sql(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ }
+ }
+
/**
* This method provides a reflection-based implementation of [[SQLConf.isStaticConfigKey]] to
* adapt Spark-3.1.x
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/execution/metric/SparkMetricsTestUtils.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/execution/metric/SparkMetricsTestUtils.scala
new file mode 100644
index 000000000..7ab06f0ef
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/execution/metric/SparkMetricsTestUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.kyuubi.SparkDatasetHelper
+
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+
+trait SparkMetricsTestUtils {
+ this: WithSparkSQLEngine =>
+
+ private lazy val statusStore = spark.sharedState.statusStore
+ private def currentExecutionIds(): Set[Long] = {
+ spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+ statusStore.executionsList.map(_.executionId).toSet
+ }
+
+ protected def getSparkPlanMetrics(df: DataFrame): Map[Long, (String, Map[String, Any])] = {
+ val previousExecutionIds = currentExecutionIds()
+ SparkDatasetHelper.executeCollect(df)
+ spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+ val executionIds = currentExecutionIds().diff(previousExecutionIds)
+ assert(executionIds.size === 1)
+ val executionId = executionIds.head
+ val metricValues = statusStore.executionMetrics(executionId)
+ SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)).allNodes
+ .map { node =>
+ val nodeMetrics = node.metrics.map { metric =>
+ val metricValue = metricValues(metric.accumulatorId)
+ (metric.name, metricValue)
+ }.toMap
+ (node.id, node.name -> nodeMetrics)
+ }.toMap
+ }
+}