You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/30 04:30:53 UTC

[spark] branch branch-3.2 updated: [SPARK-34399][SQL][3.2] Add commit duration to SQL tab's graph node

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a96e9e1  [SPARK-34399][SQL][3.2] Add commit duration to SQL tab's graph node
a96e9e1 is described below

commit a96e9e197ec0adb92a2464ba381ca67517a8b0aa
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Jul 30 12:30:20 2021 +0800

    [SPARK-34399][SQL][3.2] Add commit duration to SQL tab's graph node
    
    ### What changes were proposed in this pull request?
    Since we have add log about commit time, I think this useful and we can make user know it directly in SQL tab's UI.
    
    ![image](https://user-images.githubusercontent.com/46485123/126647754-dc3ba83a-5391-427c-8a67-e6af46e82290.png)
    
    ### Why are the changes needed?
    Make user can directly know commit duration.
    
    ### Does this PR introduce _any_ user-facing change?
    User can see file commit duration in SQL tab's SQL plan graph
    
    ### How was this patch tested?
    Mannul tested
    
    Closes #33553 from AngersZhuuuu/SPARK-34399-FOLLOWUP.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/BasicWriteStatsTracker.scala       | 38 +++++++++++++++-------
 .../datasources/FileFormatDataWriter.scala         |  9 +++--
 .../execution/datasources/FileFormatWriter.scala   |  7 ++--
 .../execution/datasources/WriteStatsTracker.scala  |  6 ++--
 .../execution/datasources/v2/FileBatchWrite.scala  |  2 +-
 .../BasicWriteTaskStatsTrackerSuite.scala          |  2 +-
 .../CustomWriteTaskStatsTrackerSuite.scala         |  4 +--
 .../sql/execution/metric/SQLMetricsSuite.scala     | 34 +++++++++++++++++++
 8 files changed, 79 insertions(+), 23 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 160ee6d..f3815ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.util.SerializableConfiguration
 
@@ -48,7 +49,9 @@ case class BasicWriteTaskStats(
 /**
  * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]].
  */
-class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
+class BasicWriteTaskStatsTracker(
+    hadoopConf: Configuration,
+    taskCommitTimeMetric: Option[SQLMetric] = None)
   extends WriteTaskStatsTracker with Logging {
 
   private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty
@@ -155,7 +158,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
     numRows += 1
   }
 
-  override def getFinalStats(): WriteTaskStats = {
+  override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
     submittedFiles.foreach(updateFileStats)
     submittedFiles.clear()
 
@@ -170,6 +173,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
         "This could be due to the output format not writing empty files, " +
         "or files being not immediately visible in the filesystem.")
     }
+    taskCommitTimeMetric.foreach(_ += taskCommitTime)
     BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows)
   }
 }
@@ -183,14 +187,21 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
  */
 class BasicWriteJobStatsTracker(
     serializableHadoopConf: SerializableConfiguration,
-    @transient val metrics: Map[String, SQLMetric])
+    @transient val driverSideMetrics: Map[String, SQLMetric],
+    taskCommitTimeMetric: SQLMetric)
   extends WriteJobStatsTracker {
 
+  def this(
+      serializableHadoopConf: SerializableConfiguration,
+      metrics: Map[String, SQLMetric]) = {
+    this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME))
+  }
+
   override def newTaskInstance(): WriteTaskStatsTracker = {
-    new BasicWriteTaskStatsTracker(serializableHadoopConf.value)
+    new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric))
   }
 
-  override def processStats(stats: Seq[WriteTaskStats]): Unit = {
+  override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {
     val sparkContext = SparkContext.getActive.get
     var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty
     var numFiles: Long = 0L
@@ -206,13 +217,14 @@ class BasicWriteJobStatsTracker(
       totalNumOutput += summary.numRows
     }
 
-    metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
-    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
-    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
-    metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(partitionsSet.size)
+    driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime)
+    driverSideMetrics(NUM_FILES_KEY).add(numFiles)
+    driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
+    driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
+    driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size)
 
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList)
   }
 }
 
@@ -221,6 +233,8 @@ object BasicWriteJobStatsTracker {
   private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
   private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
   private val NUM_PARTS_KEY = "numParts"
+  val TASK_COMMIT_TIME = "taskCommitTime"
+  val JOB_COMMIT_TIME = "jobCommitTime"
   /** XAttr key of the data length header added in HADOOP-17414. */
   val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"
 
@@ -230,7 +244,9 @@ object BasicWriteJobStatsTracker {
       NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
       NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"),
       NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-      NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
+      NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"),
+      TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"),
+      JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time")
     )
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
index 365a903..815d8ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOut
 import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
  * Abstract class for writing out data in a single Spark task.
@@ -103,10 +103,13 @@ abstract class FileFormatDataWriter(
    */
   override def commit(): WriteTaskResult = {
     releaseResources()
+    val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
+      committer.commitTask(taskAttemptContext)
+    }
     val summary = ExecutedWriteSummary(
       updatedPartitions = updatedPartitions.toSet,
-      stats = statsTrackers.map(_.getFinalStats()))
-    WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
+      stats = statsTrackers.map(_.getFinalStats(taskCommitTime)))
+    WriteTaskResult(taskCommitMessage, summary)
   }
 
   def abort(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 6839a4d..cd3d101 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -240,7 +240,7 @@ object FileFormatWriter extends Logging {
       val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
       logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")
 
-      processStats(description.statsTrackers, ret.map(_.summary.stats))
+      processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
       logInfo(s"Finished processing stats for write job ${description.uuid}.")
 
       // return a set of all the partition paths that were updated during this job
@@ -328,7 +328,8 @@ object FileFormatWriter extends Logging {
    */
   private[datasources] def processStats(
       statsTrackers: Seq[WriteJobStatsTracker],
-      statsPerTask: Seq[Seq[WriteTaskStats]])
+      statsPerTask: Seq[Seq[WriteTaskStats]],
+      jobCommitDuration: Long)
   : Unit = {
 
     val numStatsTrackers = statsTrackers.length
@@ -345,7 +346,7 @@ object FileFormatWriter extends Logging {
     }
 
     statsTrackers.zip(statsPerTracker).foreach {
-      case (statsTracker, stats) => statsTracker.processStats(stats)
+      case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration)
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
index f58aa33..157ed01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
@@ -66,10 +66,11 @@ trait WriteTaskStatsTracker {
 
   /**
    * Returns the final statistics computed so far.
+   * @param taskCommitTime Time of committing the task.
    * @note This may only be called once. Further use of the object may lead to undefined behavior.
    * @return An object of subtype of [[WriteTaskStats]], to be sent to the driver.
    */
-  def getFinalStats(): WriteTaskStats
+  def getFinalStats(taskCommitTime: Long): WriteTaskStats
 }
 
 
@@ -93,6 +94,7 @@ trait WriteJobStatsTracker extends Serializable {
    * Process the given collection of stats computed during this job.
    * E.g. aggregate them, write them to memory / disk, issue warnings, whatever.
    * @param stats One [[WriteTaskStats]] object from each successful write task.
+   * @param jobCommitTime Time of committing the job.
    * @note The type of @param `stats` is too generic. These classes should probably be parametrized:
    *   WriteTaskStatsTracker[S <: WriteTaskStats]
    *   WriteJobStatsTracker[S <: WriteTaskStats, T <: WriteTaskStatsTracker[S]]
@@ -103,5 +105,5 @@ trait WriteJobStatsTracker extends Serializable {
    * to the expected derived type when implementing this method in a derived class.
    * The framework will make sure to call this with the right arguments.
    */
-  def processStats(stats: Seq[WriteTaskStats]): Unit
+  def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
index 7227e48..ead5114 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
@@ -36,7 +36,7 @@ class FileBatchWrite(
     val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) }
     logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")
 
-    processStats(description.statsTrackers, results.map(_.summary.stats))
+    processStats(description.statsTrackers, results.map(_.summary.stats), duration)
     logInfo(s"Finished processing stats for write job ${description.uuid}.")
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
index 0237679..982e428 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
@@ -73,7 +73,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
   }
 
   private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = {
-    tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats]
+    tracker.getFinalStats(0L).asInstanceOf[BasicWriteTaskStats]
   }
 
   test("No files in run") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala
index 82d873a..e9f625b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 class CustomWriteTaskStatsTrackerSuite extends SparkFunSuite {
 
   def checkFinalStats(tracker: CustomWriteTaskStatsTracker, result: Map[String, Int]): Unit = {
-    assert(tracker.getFinalStats().asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
+    assert(tracker.getFinalStats(0L).asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
   }
 
   test("sequential file writing") {
@@ -64,7 +64,7 @@ class CustomWriteTaskStatsTracker extends WriteTaskStatsTracker {
     numRowsPerFile(filePath) += 1
   }
 
-  override def getFinalStats(): WriteTaskStats = {
+  override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
     CustomWriteTaskStats(numRowsPerFile.toMap)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 9d4fbd7..32428fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -22,6 +22,9 @@ import java.io.File
 import scala.reflect.{classTag, ClassTag}
 import scala.util.Random
 
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
@@ -29,6 +32,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, SQLHadoopMapReduceCommitProtocol}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
 import org.apache.spark.sql.functions._
@@ -790,6 +794,24 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
     }
   }
 
+  test("SPARK-34399: Add job commit duration metrics for DataWritingCommand") {
+    withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+      "org.apache.spark.sql.execution.metric.CustomFileCommitProtocol") {
+      withTable("t", "t2") {
+        sql("CREATE TABLE t(id STRING) USING PARQUET")
+        val df = sql("INSERT INTO TABLE t SELECT 'abc'")
+        val insert = df.queryExecution.executedPlan.collect {
+          case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => dataWriting.cmd
+        }
+        assert(insert.size == 1)
+        assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME))
+        assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME))
+        assert(insert.head.metrics(BasicWriteJobStatsTracker.JOB_COMMIT_TIME).value > 0)
+        assert(insert.head.metrics(BasicWriteJobStatsTracker.TASK_COMMIT_TIME).value > 0)
+      }
+    }
+  }
+
   test("SPARK-34567: Add metrics for CTAS operator") {
     withTable("t") {
       val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
@@ -807,3 +829,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
     }
   }
 }
+
+case class CustomFileCommitProtocol(
+    jobId: String,
+    path: String,
+    dynamicPartitionOverwrite: Boolean = false)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {
+  override def commitTask(
+    taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = {
+    Thread.sleep(Random.nextInt(100))
+    super.commitTask(taskContext)
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org