You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/11 10:47:59 UTC

[spark] branch master updated: [SPARK-26327][SQL] Bug fix for `FileSourceScanExec` metrics update and name changing

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd7df6b  [SPARK-26327][SQL] Bug fix for `FileSourceScanExec` metrics update and name changing
bd7df6b is described below

commit bd7df6b1e129741136d09a3d29f9ffcc32ce1de3
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Tue Dec 11 18:47:21 2018 +0800

    [SPARK-26327][SQL] Bug fix for `FileSourceScanExec` metrics update and name changing
    
    ## What changes were proposed in this pull request?
    
    As the description in [SPARK-26327](https://issues.apache.org/jira/browse/SPARK-26327), `postDriverMetricUpdates` was called on wrong place cause this bug, fix this by split the initializing of `selectedPartitions` and metrics updating logic. Add the updating logic in `inputRDD` initializing which can take effect in both code generation node and normal node. Also rename `metadataTime` to `fileListingTime` for clearer meaning.
    ## How was this patch tested?
    
    New test case in `SQLMetricsSuite`.
    Manual test:
    
    |         | Before | After |
    |---------|:--------:|:-------:|
    | CodeGen |![image](https://user-images.githubusercontent.com/4833765/49741753-13c7e800-fcd2-11e8-97a8-8057b657aa3c.png)|![image](https://user-images.githubusercontent.com/4833765/49741774-1f1b1380-fcd2-11e8-98d9-78b950f4e43a.png)|
    | Normal  |![image](https://user-images.githubusercontent.com/4833765/49741836-378b2e00-fcd2-11e8-80c3-ab462a6a3184.png)|![image](https://user-images.githubusercontent.com/4833765/49741860-4a056780-fcd2-11e8-9ef1-863de217f183.png)|
    
    Closes #23277 from xuanyuanking/SPARK-26327.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 28 +++++++++++++++-------
 .../sql/execution/metric/SQLMetricsSuite.scala     | 15 ++++++++++++
 2 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index b29d5c7..c0fa4e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -167,19 +167,14 @@ case class FileSourceScanExec(
       partitionSchema = relation.partitionSchema,
       relation.sparkSession.sessionState.conf)
 
+  private var fileListingTime = 0L
+
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
     val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
     val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
-
-    metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-    metrics("metadataTime").add(timeTakenMs)
-
-    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-      metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+    fileListingTime = timeTakenMs
     ret
   }
 
@@ -291,6 +286,8 @@ case class FileSourceScanExec(
   }
 
   private lazy val inputRDD: RDD[InternalRow] = {
+    // Update metrics for taking effect in both code generation node and normal node.
+    updateDriverMetrics()
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -316,7 +313,7 @@ case class FileSourceScanExec(
   override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
       "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
-      "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
+      "fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -507,6 +504,19 @@ case class FileSourceScanExec(
     }
   }
 
+  /**
+   * Send the updated metrics to driver, while this function calling, selectedPartitions has
+   * been initialized. See SPARK-26327 for more detail.
+   */
+  private def updateDriverMetrics() = {
+    metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+    metrics("fileListingTime").add(fileListingTime)
+
+    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+      metrics("numFiles") :: metrics("fileListingTime") :: Nil)
+  }
+
   override def doCanonicalize(): FileSourceScanExec = {
     FileSourceScanExec(
       relation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 2251607..4a80638 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -636,4 +636,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       assert(filters.head.metrics("numOutputRows").value == 1)
     }
   }
+
+  test("SPARK-26327: FileSourceScanExec metrics") {
+    withTable("testDataForScan") {
+      spark.range(10).selectExpr("id", "id % 3 as p")
+        .write.partitionBy("p").saveAsTable("testDataForScan")
+      // The execution plan only has 1 FileScan node.
+      val df = spark.sql(
+        "SELECT * FROM testDataForScan WHERE p = 1")
+      testSparkPlanMetrics(df, 1, Map(
+        0L -> (("Scan parquet default.testdataforscan", Map(
+          "number of output rows" -> 3L,
+          "number of files" -> 2L))))
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org