You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/16 17:48:10 UTC

[GitHub] xuanyuanking closed pull request #23298: [SPARK-26222][SQL] Track file listing time

xuanyuanking closed pull request #23298: [SPARK-26222][SQL] Track file listing time
URL: https://github.com/apache/spark/pull/23298
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
index cd75407c7ee7a..9d391741f596f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.BoundedPriorityQueue
  * There are two separate concepts we track:
  *
  * 1. Phases: These are broad scope phases in query planning, as listed below, i.e. analysis,
- * optimizationm and physical planning (just planning).
+ * optimization and physical planning (just planning).
  *
  * 2. Rules: These are the individual Catalyst rules that we track. In addition to time, we also
  * track the number of invocations and effective invocations.
@@ -41,6 +41,10 @@ object QueryPlanningTracker {
   val OPTIMIZATION = "optimization"
   val PLANNING = "planning"
 
+  // File listing relative phases.
+  val FILE_LISTING = "fileListing"
+  val PARTITION_PRUNING = "partitionPruning"
+
   /**
    * Summary for a rule.
    * @param totalTimeNs total amount of time, in nanosecs, spent in this rule.
@@ -79,7 +83,11 @@ object QueryPlanningTracker {
   }
 
   /** Returns the current tracker in scope, based on the thread local variable. */
-  def get: Option[QueryPlanningTracker] = Option(localTracker.get())
+  def get: QueryPlanningTracker = Option(localTracker.get()).getOrElse(NoopTracker)
+
+  /** Returns the current tracker in scope or create a new one. */
+  def getOrCreate: QueryPlanningTracker =
+    Option(localTracker.get()).getOrElse(new QueryPlanningTracker)
 
   /** Sets the current tracker for the execution of function f. We assume f is single-threaded. */
   def withTracker[T](tracker: QueryPlanningTracker)(f: => T): T = {
@@ -161,3 +169,9 @@ class QueryPlanningTracker {
   }
 
 }
+
+/** A no-op tracker used for current tracker in scope not set. */
+object NoopTracker extends QueryPlanningTracker {
+  override def measurePhase[T](phase: String)(f: => T): T = f
+  override def recordRuleInvocation(rule: String, timeNs: Long, effective: Boolean): Unit = {}
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index cf6ff4f986399..f1a0b1ae2b6e7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -86,7 +86,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
     var curPlan = plan
     val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
     val planChangeLogger = new PlanChangeLogger()
-    val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
+    val tracker: QueryPlanningTracker = QueryPlanningTracker.get
 
     batches.foreach { batch =>
       val batchStartPlan = curPlan
@@ -112,7 +112,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
             queryExecutionMetrics.incNumExecution(rule.ruleName)
 
             // Record timing information using QueryPlanningTracker
-            tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
+            tracker.recordRuleInvocation(rule.ruleName, runTime, effective)
 
             // Run the structural integrity checker against the plan after each rule.
             if (!isPlanIntegral(result)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9751528654ffb..917c9cc371fc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
@@ -193,6 +194,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         "read files of Hive data source directly.")
     }
 
+    // Pass the new tracker into the created DataFrame.
+    val tracker = new QueryPlanningTracker
     val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
     if (classOf[TableProvider].isAssignableFrom(cls)) {
       val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
@@ -211,25 +214,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
       table match {
         case s: SupportsBatchRead =>
           Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
-            provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema))
+            provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema), tracker)
 
-        case _ => loadV1Source(paths: _*)
+        case _ => loadV1Source(tracker, paths: _*)
       }
     } else {
-      loadV1Source(paths: _*)
+      loadV1Source(tracker, paths: _*)
     }
   }
 
-  private def loadV1Source(paths: String*) = {
-    // Code path for data source v1.
-    sparkSession.baseRelationToDataFrame(
-      DataSource.apply(
-        sparkSession,
-        paths = paths,
-        userSpecifiedSchema = userSpecifiedSchema,
-        className = source,
-        options = extraOptions.toMap).resolveRelation())
-  }
+  private def loadV1Source(tracker: QueryPlanningTracker, paths: String*) =
+    QueryPlanningTracker.withTracker(tracker) {
+      // Code path for data source v1.
+      sparkSession.baseRelationToDataFrame(
+        DataSource.apply(
+          sparkSession,
+          paths = paths,
+          userSpecifiedSchema = userSpecifiedSchema,
+          className = source,
+          options = extraOptions.toMap).resolveRelation())
+    }
 
   /**
    * Construct a `DataFrame` representing the database table accessible via JDBC URL
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 26272c3906685..e4034034ff63c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -433,12 +433,13 @@ class SparkSession private(
   }
 
   /**
-   * Convert a `BaseRelation` created for external data sources into a `DataFrame`.
+   * Convert a `BaseRelation` created for external data sources into a `DataFrame`. Use the local
+   * tracker in scope if it exists.
    *
    * @since 2.0.0
    */
   def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
-    Dataset.ofRows(self, LogicalRelation(baseRelation))
+    Dataset.ofRows(self, LogicalRelation(baseRelation), QueryPlanningTracker.getOrCreate)
   }
 
   /* ------------------------------- *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index c0fa4e777b49c..f773d3640180b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.{QueryPlanningMetricsSupport, SQLMetrics}
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -167,16 +167,8 @@ case class FileSourceScanExec(
       partitionSchema = relation.partitionSchema,
       relation.sparkSession.sessionState.conf)
 
-  private var fileListingTime = 0L
-
-  @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
-    val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
-    val startTime = System.nanoTime()
-    val ret = relation.location.listFiles(partitionFilters, dataFilters)
-    val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
-    fileListingTime = timeTakenMs
-    ret
-  }
+  @transient private lazy val selectedPartitions: Seq[PartitionDirectory] =
+    relation.location.listFiles(partitionFilters, dataFilters)
 
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
     val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -310,11 +302,13 @@ case class FileSourceScanExec(
     inputRDD :: Nil
   }
 
+  private lazy val fileListingMetrics =
+    QueryPlanningMetricsSupport.createFileListingMetrics(sparkContext)
+
   override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
       "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
-      "fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"),
-      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) ++ fileListingMetrics
 
   protected override def doExecute(): RDD[InternalRow] = {
     if (supportsBatch) {
@@ -510,11 +504,13 @@ case class FileSourceScanExec(
    */
   private def updateDriverMetrics() = {
     metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
-    metrics("fileListingTime").add(fileListingTime)
 
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-      metrics("numFiles") :: metrics("fileListingTime") :: Nil)
+    SQLMetrics.postDriverMetricUpdates(
+      sparkContext,
+      executionId,
+      QueryPlanningMetricsSupport.getUpdatedFileListingMetrics(fileListingMetrics)
+        :+ metrics("numFiles"))
   }
 
   override def doCanonicalize(): FileSourceScanExec = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index dda7cb55f5395..904b074ca6071 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
 
 object SQLExecution {
@@ -84,7 +85,10 @@ object SQLExecution {
             // will be caught and reported in the `SparkListenerSQLExecutionEnd`
             sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
             time = System.currentTimeMillis()))
-          body
+          // Set the tracker in the query execution as the `localTracker` during execute.
+          QueryPlanningTracker.withTracker(queryExecution.tracker) {
+            body
+          }
         } catch {
           case e: Exception =>
             ex = Some(e)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index a66a07673e25f..5ff0e5edc89ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.StructType
@@ -69,20 +70,21 @@ class CatalogFileIndex(
    */
   def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
     if (table.partitionColumnNames.nonEmpty) {
-      val startTime = System.nanoTime()
-      val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
-        table.identifier, filters)
-      val partitions = selectedPartitions.map { p =>
-        val path = new Path(p.location)
-        val fs = path.getFileSystem(hadoopConf)
-        PartitionPath(
-          p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory))
-      }
-      val partitionSpec = PartitionSpec(partitionSchema, partitions)
-      val timeNs = System.nanoTime() - startTime
+      val partitionSpec =
+        QueryPlanningTracker.get.measurePhase(QueryPlanningTracker.PARTITION_PRUNING) {
+          val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
+            table.identifier, filters)
+          val partitions = selectedPartitions.map { p =>
+            val path = new Path(p.location)
+            val fs = path.getFileSystem(hadoopConf)
+            PartitionPath(
+              p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
+              path.makeQualified(fs.getUri, fs.getWorkingDirectory))
+          }
+          PartitionSpec(partitionSchema, partitions)
+        }
       new PrunedInMemoryFileIndex(
-        sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
+        sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
     } else {
       new InMemoryFileIndex(
         sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
@@ -113,8 +115,7 @@ private class PrunedInMemoryFileIndex(
     sparkSession: SparkSession,
     tableBasePath: Path,
     fileStatusCache: FileStatusCache,
-    override val partitionSpec: PartitionSpec,
-    override val metadataOpsTimeNs: Option[Long])
+    override val partitionSpec: PartitionSpec)
   extends InMemoryFileIndex(
     sparkSession,
     partitionSpec.partitions.map(_.path),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 094a66a2820f3..6b99d38fe5729 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -72,14 +72,4 @@ trait FileIndex {
 
   /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
   def partitionSchema: StructType
-
-  /**
-   * Returns an optional metadata operation time, in nanoseconds, for listing files.
-   *
-   * We do file listing in query optimization (in order to get the proper statistics) and we want
-   * to account for file listing time in physical execution (as metrics). To do that, we save the
-   * file listing time in some implementations and physical execution calls it in this method
-   * to update the metrics.
-   */
-  def metadataOpsTimeNs: Option[Long] = None
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index fe418e610da8f..3edb870a90c5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -29,6 +29,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.execution.streaming.FileStreamSink
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -87,13 +88,14 @@ class InMemoryFileIndex(
     refresh0()
   }
 
-  private def refresh0(): Unit = {
-    val files = listLeafFiles(rootPaths)
-    cachedLeafFiles =
-      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
-    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
-    cachedPartitionSpec = null
-  }
+  private def refresh0(): Unit =
+    QueryPlanningTracker.get.measurePhase(QueryPlanningTracker.FILE_LISTING) {
+      val files = listLeafFiles(rootPaths)
+      cachedLeafFiles =
+        new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+      cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
+      cachedPartitionSpec = null
+    }
 
   override def equals(other: Any): Boolean = other match {
     case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/QueryPlanningMetricsSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/QueryPlanningMetricsSupport.scala
new file mode 100644
index 0000000000000..d96a0c892efd5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/QueryPlanningMetricsSupport.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
+
+/**
+ * It is a helper object for the metrics which is traced by [[QueryPlanningTracker]].
+ */
+object QueryPlanningMetricsSupport {
+  import QueryPlanningTracker._
+
+  val FILE_LISTING_TIME = FILE_LISTING + "Time"
+  private def startTimestampMetric(prefix: String) = prefix + "Start"
+  private def endTimestampMetric(prefix: String) = prefix + "End"
+
+  /**
+   * Create all file listing relative metrics and return the Map.
+   */
+  def createFileListingMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
+    FILE_LISTING_TIME -> SQLMetrics.createMetric(sc, "total file listing time (ms)"),
+    startTimestampMetric(FILE_LISTING) ->
+      SQLMetrics.createTimestampMetric(sc, "file listing start"),
+    startTimestampMetric(PARTITION_PRUNING) ->
+      SQLMetrics.createTimestampMetric(sc, "partition pruning start"),
+    endTimestampMetric(PARTITION_PRUNING) ->
+      SQLMetrics.createTimestampMetric(sc, "partition pruning end"),
+    endTimestampMetric(FILE_LISTING) ->
+      SQLMetrics.createTimestampMetric(sc, "file listing end"))
+
+  /**
+   * Get updated file listing relative metrics from QueryPlanningTracker phases.
+   */
+  def getUpdatedFileListingMetrics(metrics: Map[String, SQLMetric]): Seq[SQLMetric] = {
+    val updatedMetrics = new ArrayBuffer[SQLMetric]()
+
+    // Update all metrics relative with file listing phase.
+    def phaseMetricsUpdate(phase: String): Unit = {
+      val phaseSummary = QueryPlanningTracker.get.phases.get(phase)
+      if (phaseSummary.isDefined) {
+        metrics(FILE_LISTING_TIME).add(phaseSummary.get.durationMs)
+        metrics(startTimestampMetric(phase)).set(phaseSummary.get.startTimeMs)
+        metrics(endTimestampMetric(phase)).set(phaseSummary.get.endTimeMs)
+
+        updatedMetrics.append(
+          metrics(FILE_LISTING_TIME),
+          metrics(startTimestampMetric(phase)),
+          metrics(endTimestampMetric(phase)))
+      }
+    }
+    phaseMetricsUpdate(FILE_LISTING)
+    phaseMetricsUpdate(PARTITION_PRUNING)
+
+    updatedMetrics.toSeq
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 19809b07508d9..5b66875cc349f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.metric
 
+import java.sql.Timestamp
 import java.text.NumberFormat
 import java.util.Locale
 
@@ -82,6 +83,7 @@ object SQLMetrics {
   private val TIMING_METRIC = "timing"
   private val NS_TIMING_METRIC = "nsTiming"
   private val AVERAGE_METRIC = "average"
+  private val TIMESTAMP_METRIC = "timestamp"
 
   private val baseForAvgMetric: Int = 10
 
@@ -131,6 +133,14 @@ object SQLMetrics {
     acc
   }
 
+  def createTimestampMetric(sc: SparkContext, name: String): SQLMetric = {
+    // The final result of this metric in physical operator UI may looks like:
+    // start: 2018-12-15T16:49:12.634
+    val acc = new SQLMetric(TIMESTAMP_METRIC)
+    acc.register(sc, name = Some(name), countFailedValues = false)
+    acc
+  }
+
   /**
    * Create a metric to report the average information (including min, med, max) like
    * avg hash probe. As average metrics are double values, this kind of metrics should be
@@ -154,6 +164,11 @@ object SQLMetrics {
     if (metricsType == SUM_METRIC) {
       val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
       numberFormat.format(values.sum)
+    } else if (metricsType == TIMESTAMP_METRIC) {
+      val validValue = values.filter(_ > 0)
+      assert(validValue.size == 1, "Timestamp metrics should have only one valid value.")
+      val ts = new Timestamp(validValue.head)
+      ts.toLocalDateTime.toString
     } else if (metricsType == AVERAGE_METRIC) {
       val numberFormat = NumberFormat.getNumberInstance(Locale.US)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
index e42177c156ee9..2f756b73a3fc7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
@@ -17,25 +17,61 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.test.SharedSQLContext
 
 class QueryPlanningTrackerEndToEndSuite extends SharedSQLContext {
 
+  private def checkTrackerWithPhaseKeySet(df: DataFrame, keySet: Set[String]) = {
+    val tracker = df.queryExecution.tracker
+    assert(tracker.phases.keySet == keySet)
+    assert(tracker.rules.nonEmpty)
+  }
+
   test("programmatic API") {
     val df = spark.range(1000).selectExpr("count(*)")
     df.collect()
-    val tracker = df.queryExecution.tracker
-    assert(tracker.phases.keySet == Set("analysis", "optimization", "planning"))
-    assert(tracker.rules.nonEmpty)
+    checkTrackerWithPhaseKeySet(df, Set("analysis", "optimization", "planning"))
   }
 
   test("sql") {
     val df = spark.sql("select * from range(1)")
     df.collect()
+    checkTrackerWithPhaseKeySet(df, Set("parsing", "analysis", "optimization", "planning"))
+  }
 
-    val tracker = df.queryExecution.tracker
-    assert(tracker.phases.keySet == Set("parsing", "analysis", "optimization", "planning"))
-    assert(tracker.rules.nonEmpty)
+  test("file listing time in schema resolving") {
+    withTempPath { path =>
+      val pathStr = path.getAbsolutePath
+      spark.range(0, 10).write.parquet(pathStr)
+      val df = spark.read.parquet(pathStr)
+      checkTrackerWithPhaseKeySet(
+        df,
+        Set("analysis", "fileListing"))
+    }
+  }
+
+  test("file listing time in execution") {
+    withTable("src") {
+      sql("CREATE TABLE src (key INT, value STRING) using parquet")
+      val df = spark.read.table("src")
+      df.collect()
+      val tracker = df.queryExecution.tracker
+      assert(tracker.phases.keySet ==
+        Set("planning", "optimization", "analysis", "fileListing"))
+      assert(tracker.rules.nonEmpty)
+    }
+  }
+
+  test("partition pruning time and file listing time") {
+    withTable("tbl") {
+      spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl")
+      val df = sql("SELECT * FROM tbl WHERE p = 1")
+      df.collect()
+      checkTrackerWithPhaseKeySet(
+        df,
+        Set("parsing", "analysis", "optimization", "planning", "fileListing", "partitionPruning"))
+    }
   }
 
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 47265df4831df..279a111226432 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -545,18 +545,59 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
     }
   }
 
-  test("SPARK-26327: FileSourceScanExec metrics") {
+  test("File listing timestamp metrics with and without cache") {
+    withTable("parquetTable") {
+      sql("CREATE TABLE parquetTable (key INT, value STRING) using parquet")
+
+      // First time read.
+      val df0 = spark.read.table("parquetTable")
+      df0.collect()
+      val metrics0 = df0.queryExecution.executedPlan.collectLeaves().head.metrics
+      // Check partition pruning duration and timestamp not change.
+      assert(metrics0("partitionPruningStart").value == 0)
+      assert(metrics0("partitionPruningEnd").value == 0)
+      // Check file listing duration and timestamp change.
+      assert(metrics0("fileListingStart").value > 0)
+      assert(metrics0("fileListingEnd").value > 0)
+      assert(metrics0("fileListingTime").value > 0)
+
+      // Insert ten rows into the table.
+      spark.range(10).selectExpr("id", "id + 1").write.insertInto("parquetTable")
+
+      // For the second time read, file listing time will not update cause read from cache.
+      val df1 = spark.read.table("parquetTable")
+      df1.collect()
+      val metrics1 = df1.queryExecution.executedPlan.collectLeaves().head.metrics
+      // Check deterministic metrics.
+      assert(metrics1("numFiles").value == 2)
+      assert(metrics1("numOutputRows").value == 10)
+      // Check file listing duration and timestamp.
+      assert(metrics1("fileListingStart").value == 0)
+      assert(metrics1("fileListingEnd").value == 0)
+      assert(metrics1("fileListingTime").value == 0)
+    }
+  }
+
+  test("File listing and partition pruning metrics") {
     withTable("testDataForScan") {
       spark.range(10).selectExpr("id", "id % 3 as p")
         .write.partitionBy("p").saveAsTable("testDataForScan")
       // The execution plan only has 1 FileScan node.
       val df = spark.sql(
         "SELECT * FROM testDataForScan WHERE p = 1")
-      testSparkPlanMetrics(df, 1, Map(
-        0L -> (("Scan parquet default.testdataforscan", Map(
-          "number of output rows" -> 3L,
-          "number of files" -> 2L))))
-      )
+      df.collect()
+      val metrics = df.queryExecution.executedPlan.collectLeaves().head.metrics
+
+      // Check deterministic metrics.
+      assert(metrics("numFiles").value == 2)
+      assert(metrics("numOutputRows").value == 3)
+      // Check file listing duration and timestamp changed.
+      assert(metrics("fileListingStart").value > 0)
+      assert(metrics("fileListingEnd").value > 0)
+      assert(metrics("fileListingTime").value > 0)
+      // Check partition pruning duration and timestamp changed.
+      assert(metrics("partitionPruningStart").value > 0)
+      assert(metrics("partitionPruningEnd").value > 0)
     }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org