You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/14 21:05:27 UTC
[spark] branch branch-2.3 updated: [SPARK-26327][SQL][BACKPORT-2.3]
Bug fix for `FileSourceScanExec` metrics update
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 7930fbd [SPARK-26327][SQL][BACKPORT-2.3] Bug fix for `FileSourceScanExec` metrics update
7930fbd is described below
commit 7930fbdd14630959b18cc4548b1361ba4b91a1b0
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Fri Dec 14 13:05:14 2018 -0800
[SPARK-26327][SQL][BACKPORT-2.3] Bug fix for `FileSourceScanExec` metrics update
## What changes were proposed in this pull request?
Backport #23277 to branch 2.3 without the metrics renaming.
## How was this patch tested?
New test case in `SQLMetricsSuite`.
Closes #23299 from xuanyuanking/SPARK-26327-2.3.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/execution/DataSourceScanExec.scala | 26 +++++++++++++++-------
.../sql/execution/metric/SQLMetricsSuite.scala | 15 +++++++++++++
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 0a23264..5543087 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -183,19 +183,14 @@ case class FileSourceScanExec(
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)
+ private var metadataTime = 0L
+
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
-
- metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
- metrics("metadataTime").add(timeTakenMs)
-
- val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
- metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+ metadataTime = timeTakenMs
ret
}
@@ -293,6 +288,8 @@ case class FileSourceScanExec(
}
private lazy val inputRDD: RDD[InternalRow] = {
+ // Update metrics for taking effect in both code generation node and normal node.
+ updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -500,6 +497,19 @@ case class FileSourceScanExec(
}
}
+ /**
+ * Send the updated metrics to driver, while this function calling, selectedPartitions has
+ * been initialized. See SPARK-26327 for more detail.
+ */
+ private def updateDriverMetrics() = {
+ metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+ metrics("metadataTime").add(metadataTime)
+
+ val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics("numFiles") :: metrics("metadataTime") :: Nil)
+ }
+
override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a3a3f38..439a360 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -504,4 +504,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}
+
+ test("SPARK-26327: FileSourceScanExec metrics") {
+ withTable("testDataForScan") {
+ spark.range(10).selectExpr("id", "id % 3 as p")
+ .write.partitionBy("p").saveAsTable("testDataForScan")
+ // The execution plan only has 1 FileScan node.
+ val df = spark.sql(
+ "SELECT * FROM testDataForScan WHERE p = 1")
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> (("Scan parquet default.testdataforscan", Map(
+ "number of output rows" -> 3L,
+ "number of files" -> 2L))))
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org