You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/14 21:05:27 UTC

[spark] branch branch-2.3 updated: [SPARK-26327][SQL][BACKPORT-2.3] Bug fix for `FileSourceScanExec` metrics update

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 7930fbd  [SPARK-26327][SQL][BACKPORT-2.3] Bug fix for `FileSourceScanExec` metrics update
7930fbd is described below

commit 7930fbdd14630959b18cc4548b1361ba4b91a1b0
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Fri Dec 14 13:05:14 2018 -0800

    [SPARK-26327][SQL][BACKPORT-2.3] Bug fix for `FileSourceScanExec` metrics update
    
    ## What changes were proposed in this pull request?
    
    Backport #23277 to branch 2.3 without the metrics renaming.
    
    ## How was this patch tested?
    
    New test case in `SQLMetricsSuite`.
    
    Closes #23299 from xuanyuanking/SPARK-26327-2.3.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 26 +++++++++++++++-------
 .../sql/execution/metric/SQLMetricsSuite.scala     | 15 +++++++++++++
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 0a23264..5543087 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -183,19 +183,14 @@ case class FileSourceScanExec(
       partitionSchema = relation.partitionSchema,
       relation.sparkSession.sessionState.conf)
 
+  private var metadataTime = 0L
+
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
     val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
     val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
-
-    metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-    metrics("metadataTime").add(timeTakenMs)
-
-    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-      metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+    metadataTime = timeTakenMs
     ret
   }
 
@@ -293,6 +288,8 @@ case class FileSourceScanExec(
   }
 
   private lazy val inputRDD: RDD[InternalRow] = {
+    // Update metrics for taking effect in both code generation node and normal node.
+    updateDriverMetrics()
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -500,6 +497,19 @@ case class FileSourceScanExec(
     }
   }
 
+  /**
+   * Send the updated metrics to driver, while this function calling, selectedPartitions has
+   * been initialized. See SPARK-26327 for more detail.
+   */
+  private def updateDriverMetrics() = {
+    metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+    metrics("metadataTime").add(metadataTime)
+
+    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+      metrics("numFiles") :: metrics("metadataTime") :: Nil)
+  }
+
   override def doCanonicalize(): FileSourceScanExec = {
     FileSourceScanExec(
       relation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a3a3f38..439a360 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -504,4 +504,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
     testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("SPARK-26327: FileSourceScanExec metrics") {
+    withTable("testDataForScan") {
+      spark.range(10).selectExpr("id", "id % 3 as p")
+        .write.partitionBy("p").saveAsTable("testDataForScan")
+      // The execution plan only has 1 FileScan node.
+      val df = spark.sql(
+        "SELECT * FROM testDataForScan WHERE p = 1")
+      testSparkPlanMetrics(df, 1, Map(
+        0L -> (("Scan parquet default.testdataforscan", Map(
+          "number of output rows" -> 3L,
+          "number of files" -> 2L))))
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org