You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/30 04:30:53 UTC
[spark] branch branch-3.2 updated: [SPARK-34399][SQL][3.2] Add
commit duration to SQL tab's graph node
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new a96e9e1 [SPARK-34399][SQL][3.2] Add commit duration to SQL tab's graph node
a96e9e1 is described below
commit a96e9e197ec0adb92a2464ba381ca67517a8b0aa
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Jul 30 12:30:20 2021 +0800
[SPARK-34399][SQL][3.2] Add commit duration to SQL tab's graph node
### What changes were proposed in this pull request?
Since we have add log about commit time, I think this useful and we can make user know it directly in SQL tab's UI.
![image](https://user-images.githubusercontent.com/46485123/126647754-dc3ba83a-5391-427c-8a67-e6af46e82290.png)
### Why are the changes needed?
Make user can directly know commit duration.
### Does this PR introduce _any_ user-facing change?
User can see file commit duration in SQL tab's SQL plan graph
### How was this patch tested?
Mannul tested
Closes #33553 from AngersZhuuuu/SPARK-34399-FOLLOWUP.
Authored-by: Angerszhuuuu <an...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/BasicWriteStatsTracker.scala | 38 +++++++++++++++-------
.../datasources/FileFormatDataWriter.scala | 9 +++--
.../execution/datasources/FileFormatWriter.scala | 7 ++--
.../execution/datasources/WriteStatsTracker.scala | 6 ++--
.../execution/datasources/v2/FileBatchWrite.scala | 2 +-
.../BasicWriteTaskStatsTrackerSuite.scala | 2 +-
.../CustomWriteTaskStatsTrackerSuite.scala | 4 +--
.../sql/execution/metric/SQLMetricsSuite.scala | 34 +++++++++++++++++++
8 files changed, 79 insertions(+), 23 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 160ee6d..f3815ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration
@@ -48,7 +49,9 @@ case class BasicWriteTaskStats(
/**
* Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]].
*/
-class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
+class BasicWriteTaskStatsTracker(
+ hadoopConf: Configuration,
+ taskCommitTimeMetric: Option[SQLMetric] = None)
extends WriteTaskStatsTracker with Logging {
private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty
@@ -155,7 +158,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
numRows += 1
}
- override def getFinalStats(): WriteTaskStats = {
+ override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
submittedFiles.foreach(updateFileStats)
submittedFiles.clear()
@@ -170,6 +173,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
"This could be due to the output format not writing empty files, " +
"or files being not immediately visible in the filesystem.")
}
+ taskCommitTimeMetric.foreach(_ += taskCommitTime)
BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows)
}
}
@@ -183,14 +187,21 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
*/
class BasicWriteJobStatsTracker(
serializableHadoopConf: SerializableConfiguration,
- @transient val metrics: Map[String, SQLMetric])
+ @transient val driverSideMetrics: Map[String, SQLMetric],
+ taskCommitTimeMetric: SQLMetric)
extends WriteJobStatsTracker {
+ def this(
+ serializableHadoopConf: SerializableConfiguration,
+ metrics: Map[String, SQLMetric]) = {
+ this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME))
+ }
+
override def newTaskInstance(): WriteTaskStatsTracker = {
- new BasicWriteTaskStatsTracker(serializableHadoopConf.value)
+ new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric))
}
- override def processStats(stats: Seq[WriteTaskStats]): Unit = {
+ override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {
val sparkContext = SparkContext.getActive.get
var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty
var numFiles: Long = 0L
@@ -206,13 +217,14 @@ class BasicWriteJobStatsTracker(
totalNumOutput += summary.numRows
}
- metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
- metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
- metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
- metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(partitionsSet.size)
+ driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime)
+ driverSideMetrics(NUM_FILES_KEY).add(numFiles)
+ driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
+ driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
+ driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList)
}
}
@@ -221,6 +233,8 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
+ val TASK_COMMIT_TIME = "taskCommitTime"
+ val JOB_COMMIT_TIME = "jobCommitTime"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"
@@ -230,7 +244,9 @@ object BasicWriteJobStatsTracker {
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
- NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
+ NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"),
+ TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"),
+ JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time")
)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
index 365a903..815d8ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOut
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* Abstract class for writing out data in a single Spark task.
@@ -103,10 +103,13 @@ abstract class FileFormatDataWriter(
*/
override def commit(): WriteTaskResult = {
releaseResources()
+ val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
+ committer.commitTask(taskAttemptContext)
+ }
val summary = ExecutedWriteSummary(
updatedPartitions = updatedPartitions.toSet,
- stats = statsTrackers.map(_.getFinalStats()))
- WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
+ stats = statsTrackers.map(_.getFinalStats(taskCommitTime)))
+ WriteTaskResult(taskCommitMessage, summary)
}
def abort(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 6839a4d..cd3d101 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -240,7 +240,7 @@ object FileFormatWriter extends Logging {
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")
- processStats(description.statsTrackers, ret.map(_.summary.stats))
+ processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
logInfo(s"Finished processing stats for write job ${description.uuid}.")
// return a set of all the partition paths that were updated during this job
@@ -328,7 +328,8 @@ object FileFormatWriter extends Logging {
*/
private[datasources] def processStats(
statsTrackers: Seq[WriteJobStatsTracker],
- statsPerTask: Seq[Seq[WriteTaskStats]])
+ statsPerTask: Seq[Seq[WriteTaskStats]],
+ jobCommitDuration: Long)
: Unit = {
val numStatsTrackers = statsTrackers.length
@@ -345,7 +346,7 @@ object FileFormatWriter extends Logging {
}
statsTrackers.zip(statsPerTracker).foreach {
- case (statsTracker, stats) => statsTracker.processStats(stats)
+ case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
index f58aa33..157ed01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
@@ -66,10 +66,11 @@ trait WriteTaskStatsTracker {
/**
* Returns the final statistics computed so far.
+ * @param taskCommitTime Time of committing the task.
* @note This may only be called once. Further use of the object may lead to undefined behavior.
* @return An object of subtype of [[WriteTaskStats]], to be sent to the driver.
*/
- def getFinalStats(): WriteTaskStats
+ def getFinalStats(taskCommitTime: Long): WriteTaskStats
}
@@ -93,6 +94,7 @@ trait WriteJobStatsTracker extends Serializable {
* Process the given collection of stats computed during this job.
* E.g. aggregate them, write them to memory / disk, issue warnings, whatever.
* @param stats One [[WriteTaskStats]] object from each successful write task.
+ * @param jobCommitTime Time of committing the job.
* @note The type of @param `stats` is too generic. These classes should probably be parametrized:
* WriteTaskStatsTracker[S <: WriteTaskStats]
* WriteJobStatsTracker[S <: WriteTaskStats, T <: WriteTaskStatsTracker[S]]
@@ -103,5 +105,5 @@ trait WriteJobStatsTracker extends Serializable {
* to the expected derived type when implementing this method in a derived class.
* The framework will make sure to call this with the right arguments.
*/
- def processStats(stats: Seq[WriteTaskStats]): Unit
+ def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
index 7227e48..ead5114 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
@@ -36,7 +36,7 @@ class FileBatchWrite(
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) }
logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")
- processStats(description.statsTrackers, results.map(_.summary.stats))
+ processStats(description.statsTrackers, results.map(_.summary.stats), duration)
logInfo(s"Finished processing stats for write job ${description.uuid}.")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
index 0237679..982e428 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
@@ -73,7 +73,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
}
private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = {
- tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats]
+ tracker.getFinalStats(0L).asInstanceOf[BasicWriteTaskStats]
}
test("No files in run") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala
index 82d873a..e9f625b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
class CustomWriteTaskStatsTrackerSuite extends SparkFunSuite {
def checkFinalStats(tracker: CustomWriteTaskStatsTracker, result: Map[String, Int]): Unit = {
- assert(tracker.getFinalStats().asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
+ assert(tracker.getFinalStats(0L).asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
}
test("sequential file writing") {
@@ -64,7 +64,7 @@ class CustomWriteTaskStatsTracker extends WriteTaskStatsTracker {
numRowsPerFile(filePath) += 1
}
- override def getFinalStats(): WriteTaskStats = {
+ override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
CustomWriteTaskStats(numRowsPerFile.toMap)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 9d4fbd7..32428fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -22,6 +22,9 @@ import java.io.File
import scala.reflect.{classTag, ClassTag}
import scala.util.Random
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
@@ -29,6 +32,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, SQLHadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.functions._
@@ -790,6 +794,24 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}
+ test("SPARK-34399: Add job commit duration metrics for DataWritingCommand") {
+ withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+ "org.apache.spark.sql.execution.metric.CustomFileCommitProtocol") {
+ withTable("t", "t2") {
+ sql("CREATE TABLE t(id STRING) USING PARQUET")
+ val df = sql("INSERT INTO TABLE t SELECT 'abc'")
+ val insert = df.queryExecution.executedPlan.collect {
+ case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => dataWriting.cmd
+ }
+ assert(insert.size == 1)
+ assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME))
+ assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME))
+ assert(insert.head.metrics(BasicWriteJobStatsTracker.JOB_COMMIT_TIME).value > 0)
+ assert(insert.head.metrics(BasicWriteJobStatsTracker.TASK_COMMIT_TIME).value > 0)
+ }
+ }
+ }
+
test("SPARK-34567: Add metrics for CTAS operator") {
withTable("t") {
val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
@@ -807,3 +829,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}
}
+
+case class CustomFileCommitProtocol(
+ jobId: String,
+ path: String,
+ dynamicPartitionOverwrite: Boolean = false)
+ extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {
+ override def commitTask(
+ taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = {
+ Thread.sleep(Random.nextInt(100))
+ super.commitTask(taskContext)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org