You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/11 10:47:59 UTC
[spark] branch master updated: [SPARK-26327][SQL] Bug fix for
`FileSourceScanExec` metrics update and name changing
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd7df6b [SPARK-26327][SQL] Bug fix for `FileSourceScanExec` metrics update and name changing
bd7df6b is described below
commit bd7df6b1e129741136d09a3d29f9ffcc32ce1de3
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Tue Dec 11 18:47:21 2018 +0800
[SPARK-26327][SQL] Bug fix for `FileSourceScanExec` metrics update and name changing
## What changes were proposed in this pull request?
As the description in [SPARK-26327](https://issues.apache.org/jira/browse/SPARK-26327), `postDriverMetricUpdates` was called on wrong place cause this bug, fix this by split the initializing of `selectedPartitions` and metrics updating logic. Add the updating logic in `inputRDD` initializing which can take effect in both code generation node and normal node. Also rename `metadataTime` to `fileListingTime` for clearer meaning.
## How was this patch tested?
New test case in `SQLMetricsSuite`.
Manual test:
| | Before | After |
|---------|:--------:|:-------:|
| CodeGen |![image](https://user-images.githubusercontent.com/4833765/49741753-13c7e800-fcd2-11e8-97a8-8057b657aa3c.png)|![image](https://user-images.githubusercontent.com/4833765/49741774-1f1b1380-fcd2-11e8-98d9-78b950f4e43a.png)|
| Normal |![image](https://user-images.githubusercontent.com/4833765/49741836-378b2e00-fcd2-11e8-80c3-ab462a6a3184.png)|![image](https://user-images.githubusercontent.com/4833765/49741860-4a056780-fcd2-11e8-9ef1-863de217f183.png)|
Closes #23277 from xuanyuanking/SPARK-26327.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/DataSourceScanExec.scala | 28 +++++++++++++++-------
.../sql/execution/metric/SQLMetricsSuite.scala | 15 ++++++++++++
2 files changed, 34 insertions(+), 9 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index b29d5c7..c0fa4e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -167,19 +167,14 @@ case class FileSourceScanExec(
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)
+ private var fileListingTime = 0L
+
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
-
- metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
- metrics("metadataTime").add(timeTakenMs)
-
- val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
- metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+ fileListingTime = timeTakenMs
ret
}
@@ -291,6 +286,8 @@ case class FileSourceScanExec(
}
private lazy val inputRDD: RDD[InternalRow] = {
+ // Update metrics for taking effect in both code generation node and normal node.
+ updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -316,7 +313,7 @@ case class FileSourceScanExec(
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
- "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
+ "fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
protected override def doExecute(): RDD[InternalRow] = {
@@ -507,6 +504,19 @@ case class FileSourceScanExec(
}
}
+ /**
+ * Send the updated metrics to driver, while this function calling, selectedPartitions has
+ * been initialized. See SPARK-26327 for more detail.
+ */
+ private def updateDriverMetrics() = {
+ metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+ metrics("fileListingTime").add(fileListingTime)
+
+ val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics("numFiles") :: metrics("fileListingTime") :: Nil)
+ }
+
override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 2251607..4a80638 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -636,4 +636,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
assert(filters.head.metrics("numOutputRows").value == 1)
}
}
+
+ test("SPARK-26327: FileSourceScanExec metrics") {
+ withTable("testDataForScan") {
+ spark.range(10).selectExpr("id", "id % 3 as p")
+ .write.partitionBy("p").saveAsTable("testDataForScan")
+ // The execution plan only has 1 FileScan node.
+ val df = spark.sql(
+ "SELECT * FROM testDataForScan WHERE p = 1")
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> (("Scan parquet default.testdataforscan", Map(
+ "number of output rows" -> 3L,
+ "number of files" -> 2L))))
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org