You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/06 07:47:14 UTC

spark git commit: [SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations

Repository: spark
Updated Branches:
  refs/heads/master 5800144a5 -> 6ff05a66f


[SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations

## What changes were proposed in this pull request?

Right now in the UI, after SPARK-20213, we can show the operations to write data out. However, there is no way to associate metrics with data writes. We should show relative metrics on the operations.

#### Supported commands

This change supports updating metrics for file-based data writing operations, including `InsertIntoHadoopFsRelationCommand`, `InsertIntoHiveTable`.

Supported metrics:

* number of written files
* number of dynamic partitions
* total bytes of written data
* total number of output rows
* average writing data out time (ms)
* (TODO) min/med/max number of output rows per file/partition
* (TODO) min/med/max bytes of written data per file/partition

####  Commands not supported

`InsertIntoDataSourceCommand`, `SaveIntoDataSourceCommand`:

The two commands uses DataSource APIs to write data out, i.e., the logic of writing data out is delegated to the DataSource implementations, such as  `InsertableRelation.insert` and `CreatableRelationProvider.createRelation`. So we can't obtain metrics from delegated methods for now.

`CreateHiveTableAsSelectCommand`, `CreateDataSourceTableAsSelectCommand` :

The two commands invokes other commands to write data out. The invoked commands can even write to non file-based data source. We leave them as future TODO.

#### How to update metrics of writing files out

A `RunnableCommand` which wants to update metrics, needs to override its `metrics` and provide the metrics data structure to `ExecutedCommandExec`.

The metrics are prepared during the execution of `FileFormatWriter`. The callback function passed to `FileFormatWriter` will accept the metrics and update accordingly.

There is a metrics updating function in `RunnableCommand`. In runtime, the function will be bound to the spark context and `metrics` of `ExecutedCommandExec` and pass to `FileFormatWriter`.

## How was this patch tested?

Updated unit tests.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #18159 from viirya/SPARK-20703-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ff05a66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ff05a66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ff05a66

Branch: refs/heads/master
Commit: 6ff05a66fe83e721063efe5c28d2ffeb850fecc7
Parents: 5800144
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Jul 6 15:47:09 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 6 15:47:09 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     |   9 ++
 .../execution/command/DataWritingCommand.scala  |  75 ++++++++++
 .../spark/sql/execution/command/commands.scala  |  12 ++
 .../datasources/FileFormatWriter.scala          | 121 +++++++++++++---
 .../InsertIntoHadoopFsRelationCommand.scala     |  18 ++-
 .../sql/sources/PartitionedWriteSuite.scala     |  21 +--
 .../hive/execution/InsertIntoHiveTable.scala    |   8 +-
 .../sql/hive/execution/SQLMetricsSuite.scala    | 139 +++++++++++++++++++
 8 files changed, 362 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 26f61e2..b4caf68 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1003,6 +1003,15 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Lists files recursively.
+   */
+  def recursiveList(f: File): Array[File] = {
+    require(f.isDirectory)
+    val current = f.listFiles
+    current ++ current.filter(_.isDirectory).flatMap(recursiveList)
+  }
+
+  /**
    * Delete a file or directory and its contents recursively.
    * Don't follow directories if they are symlinks.
    * Throws an exception if deletion is unsuccessful.

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
new file mode 100644
index 0000000..0c381a2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
+
+  override lazy val metrics: Map[String, SQLMetric] = {
+    val sparkContext = SparkContext.getActive.get
+    Map(
+      "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"),
+      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
+      "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+      "numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
+    )
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing operation.
+   */
+  protected def updateWritingMetrics(writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+    val sparkContext = SparkContext.getActive.get
+    var numPartitions = 0
+    var numFiles = 0
+    var totalNumBytes: Long = 0L
+    var totalNumOutput: Long = 0L
+    var totalWritingTime: Long = 0L
+
+    writeSummaries.foreach { summary =>
+      numPartitions += summary.updatedPartitions.size
+      numFiles += summary.numOutputFile
+      totalNumBytes += summary.numOutputBytes
+      totalNumOutput += summary.numOutputRows
+      totalWritingTime += summary.totalWritingTime
+    }
+
+    val avgWritingTime = if (numFiles > 0) {
+      (totalWritingTime / numFiles).toLong
+    } else {
+      0L
+    }
+
+    metrics("avgTime").add(avgWritingTime)
+    metrics("numFiles").add(numFiles)
+    metrics("numOutputBytes").add(totalNumBytes)
+    metrics("numOutputRows").add(totalNumOutput)
+    metrics("numParts").add(numPartitions)
+
+    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 81bc93e..7cd4bae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
@@ -37,6 +38,11 @@ import org.apache.spark.sql.types._
  * wrapped in `ExecutedCommand` during execution.
  */
 trait RunnableCommand extends logical.Command {
+
+  // The map used to record the metrics of running the command. This will be passed to
+  // `ExecutedCommand` during query planning.
+  lazy val metrics: Map[String, SQLMetric] = Map.empty
+
   def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
     throw new NotImplementedError
   }
@@ -49,8 +55,14 @@ trait RunnableCommand extends logical.Command {
 /**
  * A physical operator that executes the run method of a `RunnableCommand` and
  * saves the result to prevent multiple executions.
+ *
+ * @param cmd the `RunnableCommand` this operator will run.
+ * @param children the children physical plans ran by the `RunnableCommand`.
  */
 case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {
+
+  override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
+
   /**
    * A concrete command should override this lazy field to wrap up any side effects caused by the
    * command or any other computation that should be evaluated exactly once. The value of this field

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 0daffa9..6486663 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -22,7 +22,7 @@ import java.util.{Date, UUID}
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -82,7 +82,7 @@ object FileFormatWriter extends Logging {
   }
 
   /** The result of a successful write task. */
-  private case class WriteTaskResult(commitMsg: TaskCommitMessage, updatedPartitions: Set[String])
+  private case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary)
 
   /**
    * Basic work flow of this command is:
@@ -104,7 +104,7 @@ object FileFormatWriter extends Logging {
       hadoopConf: Configuration,
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
-      refreshFunction: (Seq[TablePartitionSpec]) => Unit,
+      refreshFunction: (Seq[ExecutedWriteSummary]) => Unit,
       options: Map[String, String]): Unit = {
 
     val job = Job.getInstance(hadoopConf)
@@ -196,12 +196,10 @@ object FileFormatWriter extends Logging {
         })
 
       val commitMsgs = ret.map(_.commitMsg)
-      val updatedPartitions = ret.flatMap(_.updatedPartitions)
-        .distinct.map(PartitioningUtils.parsePathFragment)
 
       committer.commitJob(job, commitMsgs)
       logInfo(s"Job ${job.getJobID} committed.")
-      refreshFunction(updatedPartitions)
+      refreshFunction(ret.map(_.summary))
     } catch { case cause: Throwable =>
       logError(s"Aborting job ${job.getJobID}.", cause)
       committer.abortJob(job)
@@ -247,9 +245,9 @@ object FileFormatWriter extends Logging {
     try {
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
         // Execute the task to write rows out and commit the task.
-        val outputPartitions = writeTask.execute(iterator)
+        val summary = writeTask.execute(iterator)
         writeTask.releaseResources()
-        WriteTaskResult(committer.commitTask(taskAttemptContext), outputPartitions)
+        WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
       })(catchBlock = {
         // If there is an error, release resource and then abort the task
         try {
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
    * automatically trigger task aborts.
    */
   private trait ExecuteWriteTask {
+
     /**
-     * Writes data out to files, and then returns the list of partition strings written out.
-     * The list of partitions is sent back to the driver and used to update the catalog.
+     * The data structures used to measure metrics during writing.
      */
-    def execute(iterator: Iterator[InternalRow]): Set[String]
+    protected var totalWritingTime: Long = 0L
+    protected var timeOnCurrentFile: Long = 0L
+    protected var numOutputRows: Long = 0L
+    protected var numOutputBytes: Long = 0L
+
+    /**
+     * Writes data out to files, and then returns the summary of relative information which
+     * includes the list of partition strings written out. The list of partitions is sent back
+     * to the driver and used to update the catalog. Other information will be sent back to the
+     * driver too and used to update the metrics in UI.
+     */
+    def execute(iterator: Iterator[InternalRow]): ExecutedWriteSummary
     def releaseResources(): Unit
+
+    /**
+     * A helper function used to determine the size in  bytes of a written file.
+     */
+    protected def getFileSize(conf: Configuration, filePath: String): Long = {
+      if (filePath != null) {
+        val path = new Path(filePath)
+        val fs = path.getFileSystem(conf)
+        fs.getFileStatus(path).getLen()
+      } else {
+        0L
+      }
+    }
   }
 
   /** Writes data to a single directory (used for non-dynamic-partition writes). */
@@ -288,24 +310,26 @@ object FileFormatWriter extends Logging {
       committer: FileCommitProtocol) extends ExecuteWriteTask {
 
     private[this] var currentWriter: OutputWriter = _
+    private[this] var currentPath: String = _
 
     private def newOutputWriter(fileCounter: Int): Unit = {
       val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
-      val tmpFilePath = committer.newTaskTempFile(
+      currentPath = committer.newTaskTempFile(
         taskAttemptContext,
         None,
         f"-c$fileCounter%03d" + ext)
 
       currentWriter = description.outputWriterFactory.newInstance(
-        path = tmpFilePath,
+        path = currentPath,
         dataSchema = description.dataColumns.toStructType,
         context = taskAttemptContext)
     }
 
-    override def execute(iter: Iterator[InternalRow]): Set[String] = {
+    override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
       var fileCounter = 0
       var recordsInFile: Long = 0L
       newOutputWriter(fileCounter)
+
       while (iter.hasNext) {
         if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
           fileCounter += 1
@@ -314,21 +338,35 @@ object FileFormatWriter extends Logging {
 
           recordsInFile = 0
           releaseResources()
+          numOutputRows += recordsInFile
           newOutputWriter(fileCounter)
         }
 
         val internalRow = iter.next()
+        val startTime = System.nanoTime()
         currentWriter.write(internalRow)
+        timeOnCurrentFile += (System.nanoTime() - startTime)
         recordsInFile += 1
       }
       releaseResources()
-      Set.empty
+      numOutputRows += recordsInFile
+
+      ExecutedWriteSummary(
+        updatedPartitions = Set.empty,
+        numOutputFile = fileCounter + 1,
+        numOutputBytes = numOutputBytes,
+        numOutputRows = numOutputRows,
+        totalWritingTime = totalWritingTime)
     }
 
     override def releaseResources(): Unit = {
       if (currentWriter != null) {
         try {
+          val startTime = System.nanoTime()
           currentWriter.close()
+          totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
+          timeOnCurrentFile = 0
+          numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
         } finally {
           currentWriter = null
         }
@@ -348,6 +386,8 @@ object FileFormatWriter extends Logging {
     // currentWriter is initialized whenever we see a new key
     private var currentWriter: OutputWriter = _
 
+    private var currentPath: String = _
+
     /** Expressions that given partition columns build a path string like: col1=val/col2=val/... */
     private def partitionPathExpression: Seq[Expression] = {
       desc.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
@@ -403,19 +443,19 @@ object FileFormatWriter extends Logging {
         case _ =>
           None
       }
-      val path = if (customPath.isDefined) {
+      currentPath = if (customPath.isDefined) {
         committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
       } else {
         committer.newTaskTempFile(taskAttemptContext, partDir, ext)
       }
 
       currentWriter = desc.outputWriterFactory.newInstance(
-        path = path,
+        path = currentPath,
         dataSchema = desc.dataColumns.toStructType,
         context = taskAttemptContext)
     }
 
-    override def execute(iter: Iterator[InternalRow]): Set[String] = {
+    override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
       val getPartitionColsAndBucketId = UnsafeProjection.create(
         desc.partitionColumns ++ desc.bucketIdExpression, desc.allColumns)
 
@@ -429,15 +469,22 @@ object FileFormatWriter extends Logging {
       // If anything below fails, we should abort the task.
       var recordsInFile: Long = 0L
       var fileCounter = 0
+      var totalFileCounter = 0
       var currentPartColsAndBucketId: UnsafeRow = null
       val updatedPartitions = mutable.Set[String]()
+
       for (row <- iter) {
         val nextPartColsAndBucketId = getPartitionColsAndBucketId(row)
         if (currentPartColsAndBucketId != nextPartColsAndBucketId) {
+          if (currentPartColsAndBucketId != null) {
+            totalFileCounter += (fileCounter + 1)
+          }
+
           // See a new partition or bucket - write to a new partition dir (or a new bucket file).
           currentPartColsAndBucketId = nextPartColsAndBucketId.copy()
           logDebug(s"Writing partition: $currentPartColsAndBucketId")
 
+          numOutputRows += recordsInFile
           recordsInFile = 0
           fileCounter = 0
 
@@ -447,6 +494,8 @@ object FileFormatWriter extends Logging {
             recordsInFile >= desc.maxRecordsPerFile) {
           // Exceeded the threshold in terms of the number of records per file.
           // Create a new file by increasing the file counter.
+
+          numOutputRows += recordsInFile
           recordsInFile = 0
           fileCounter += 1
           assert(fileCounter < MAX_FILE_COUNTER,
@@ -455,18 +504,33 @@ object FileFormatWriter extends Logging {
           releaseResources()
           newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions)
         }
-
+        val startTime = System.nanoTime()
         currentWriter.write(getOutputRow(row))
+        timeOnCurrentFile += (System.nanoTime() - startTime)
         recordsInFile += 1
       }
+      if (currentPartColsAndBucketId != null) {
+        totalFileCounter += (fileCounter + 1)
+      }
       releaseResources()
-      updatedPartitions.toSet
+      numOutputRows += recordsInFile
+
+      ExecutedWriteSummary(
+        updatedPartitions = updatedPartitions.toSet,
+        numOutputFile = totalFileCounter,
+        numOutputBytes = numOutputBytes,
+        numOutputRows = numOutputRows,
+        totalWritingTime = totalWritingTime)
     }
 
     override def releaseResources(): Unit = {
       if (currentWriter != null) {
         try {
+          val startTime = System.nanoTime()
           currentWriter.close()
+          totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
+          timeOnCurrentFile = 0
+          numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
         } finally {
           currentWriter = null
         }
@@ -474,3 +538,20 @@ object FileFormatWriter extends Logging {
     }
   }
 }
+
+/**
+ * Wrapper class for the metrics of writing data out.
+ *
+ * @param updatedPartitions the partitions updated during writing data out. Only valid
+ *                          for dynamic partition.
+ * @param numOutputFile the total number of files.
+ * @param numOutputRows the number of output rows.
+ * @param numOutputBytes the bytes of output data.
+ * @param totalWritingTime the total writing time in ms.
+ */
+case class ExecutedWriteSummary(
+  updatedPartitions: Set[String],
+  numOutputFile: Int,
+  numOutputRows: Long,
+  numOutputBytes: Long,
+  totalWritingTime: Long)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index ab26f2a..0031567 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
@@ -29,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 
 /**
  * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
@@ -53,7 +55,7 @@ case class InsertIntoHadoopFsRelationCommand(
     mode: SaveMode,
     catalogTable: Option[CatalogTable],
     fileIndex: Option[FileIndex])
-  extends RunnableCommand {
+  extends DataWritingCommand {
   import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 
   override def children: Seq[LogicalPlan] = query :: Nil
@@ -123,8 +125,16 @@ case class InsertIntoHadoopFsRelationCommand(
 
     if (doInsertion) {
 
-      // Callback for updating metastore partition metadata after the insertion job completes.
-      def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
+      // Callback for updating metric and metastore partition metadata
+      // after the insertion job completes.
+      def refreshCallback(summary: Seq[ExecutedWriteSummary]): Unit = {
+        val updatedPartitions = summary.flatMap(_.updatedPartitions)
+          .distinct.map(PartitioningUtils.parsePathFragment)
+
+        // Updating metrics.
+        updateWritingMetrics(summary)
+
+        // Updating metastore partition metadata.
         if (partitionsTrackedByCatalog) {
           val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
           if (newPartitions.nonEmpty) {
@@ -154,7 +164,7 @@ case class InsertIntoHadoopFsRelationCommand(
         hadoopConf = hadoopConf,
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
-        refreshFunction = refreshPartitionsCallback,
+        refreshFunction = refreshCallback,
         options = options)
 
       // refresh cached files in FileIndex

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index a2f3afe..6f998aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -91,15 +91,15 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     withTempDir { f =>
       spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
         .write.option("maxRecordsPerFile", 1).mode("overwrite").parquet(f.getAbsolutePath)
-      assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
 
       spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
         .write.option("maxRecordsPerFile", 2).mode("overwrite").parquet(f.getAbsolutePath)
-      assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
+      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
 
       spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
         .write.option("maxRecordsPerFile", -1).mode("overwrite").parquet(f.getAbsolutePath)
-      assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
+      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
     }
   }
 
@@ -111,7 +111,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
         .option("maxRecordsPerFile", 1)
         .mode("overwrite")
         .parquet(f.getAbsolutePath)
-      assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
     }
   }
 
@@ -138,14 +138,14 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     val df = Seq((1, ts)).toDF("i", "ts")
     withTempPath { f =>
       df.write.partitionBy("ts").parquet(f.getAbsolutePath)
-      val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+      val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
       assert(files.length == 1)
       checkPartitionValues(files.head, "2016-12-01 00:00:00")
     }
     withTempPath { f =>
       df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
         .partitionBy("ts").parquet(f.getAbsolutePath)
-      val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+      val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
       assert(files.length == 1)
       // use timeZone option "GMT" to format partition value.
       checkPartitionValues(files.head, "2016-12-01 08:00:00")
@@ -153,18 +153,11 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     withTempPath { f =>
       withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") {
         df.write.partitionBy("ts").parquet(f.getAbsolutePath)
-        val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+        val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
         assert(files.length == 1)
         // if there isn't timeZone option, then use session local timezone.
         checkPartitionValues(files.head, "2016-12-01 08:00:00")
       }
     }
   }
-
-  /** Lists files recursively. */
-  private def recursiveList(f: File): Array[File] = {
-    require(f.isDirectory)
-    val current = f.listFiles
-    current ++ current.filter(_.isDirectory).flatMap(recursiveList)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 223d375..cd263e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -31,14 +31,16 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.hadoop.hive.ql.ErrorMsg
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command.{CommandUtils, RunnableCommand}
+import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand}
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion}
@@ -80,7 +82,7 @@ case class InsertIntoHiveTable(
     partition: Map[String, Option[String]],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifPartitionNotExists: Boolean) extends RunnableCommand {
+    ifPartitionNotExists: Boolean) extends DataWritingCommand {
 
   override def children: Seq[LogicalPlan] = query :: Nil
 
@@ -354,7 +356,7 @@ case class InsertIntoHiveTable(
       hadoopConf = hadoopConf,
       partitionColumns = partitionAttributes,
       bucketSpec = None,
-      refreshFunction = _ => (),
+      refreshFunction = updateWritingMetrics,
       options = Map.empty)
 
     if (partition.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
new file mode 100644
index 0000000..1ef1988
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton {
+  import spark.implicits._
+
+  /**
+   * Get execution metrics for the SQL execution and verify metrics values.
+   *
+   * @param metricsValues the expected metric values (numFiles, numPartitions, numOutputRows).
+   * @param func the function can produce execution id after running.
+   */
+  private def verifyWriteDataMetrics(metricsValues: Seq[Int])(func: => Unit): Unit = {
+    val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
+    // Run the given function to trigger query execution.
+    func
+    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+    val executionIds =
+      spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
+    assert(executionIds.size == 1)
+    val executionId = executionIds.head
+
+    val executionData = spark.sharedState.listener.getExecution(executionId).get
+    val executedNode = executionData.physicalPlanGraph.nodes.head
+
+    val metricsNames = Seq(
+      "number of written files",
+      "number of dynamic part",
+      "number of output rows")
+
+    val metrics = spark.sharedState.listener.getExecutionMetrics(executionId)
+
+    metricsNames.zip(metricsValues).foreach { case (metricsName, expected) =>
+      val sqlMetric = executedNode.metrics.find(_.name == metricsName)
+      assert(sqlMetric.isDefined)
+      val accumulatorId = sqlMetric.get.accumulatorId
+      val metricValue = metrics(accumulatorId).replaceAll(",", "").toInt
+      assert(metricValue == expected)
+    }
+
+    val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get
+    val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt
+    assert(totalNumBytes > 0)
+    val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get
+    val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt
+    assert(writingTime >= 0)
+  }
+
+  private def testMetricsNonDynamicPartition(
+      dataFormat: String,
+      tableName: String): Unit = {
+    withTable(tableName) {
+      Seq((1, 2)).toDF("i", "j")
+        .write.format(dataFormat).mode("overwrite").saveAsTable(tableName)
+
+      val tableLocation =
+        new File(spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location)
+
+      // 2 files, 100 rows, 0 dynamic partition.
+      verifyWriteDataMetrics(Seq(2, 0, 100)) {
+        (0 until 100).map(i => (i, i + 1)).toDF("i", "j").repartition(2)
+          .write.format(dataFormat).mode("overwrite").insertInto(tableName)
+      }
+      assert(Utils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2)
+    }
+  }
+
+  private def testMetricsDynamicPartition(
+      provider: String,
+      dataFormat: String,
+      tableName: String): Unit = {
+    withTempPath { dir =>
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName(a int, b int)
+           |USING $provider
+           |PARTITIONED BY(a)
+           |LOCATION '${dir.toURI}'
+         """.stripMargin)
+      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+      assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+      val df = spark.range(start = 0, end = 40, step = 1, numPartitions = 1)
+        .selectExpr("id a", "id b")
+
+      // 40 files, 80 rows, 40 dynamic partitions.
+      verifyWriteDataMetrics(Seq(40, 40, 80)) {
+        df.union(df).repartition(2, $"a")
+          .write
+          .format(dataFormat)
+          .mode("overwrite")
+          .insertInto(tableName)
+      }
+      assert(Utils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40)
+    }
+  }
+
+  test("writing data out metrics: parquet") {
+    testMetricsNonDynamicPartition("parquet", "t1")
+  }
+
+  test("writing data out metrics with dynamic partition: parquet") {
+    testMetricsDynamicPartition("parquet", "parquet", "t1")
+  }
+
+  test("writing data out metrics: hive") {
+    testMetricsNonDynamicPartition("hive", "t1")
+  }
+
+  test("writing data out metrics dynamic partition: hive") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+      testMetricsDynamicPartition("hive", "hive", "t1")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org