You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/29 11:19:24 UTC

spark git commit: [SPARK-22605][SQL] SQL write job should also set Spark task output metrics

Repository: spark
Updated Branches:
  refs/heads/master e9b2070ab -> 20b239845


[SPARK-22605][SQL] SQL write job should also set Spark task output metrics

## What changes were proposed in this pull request?

For SQL write jobs, we only set metrics for the SQL listener and display them in the SQL plan UI. We should also set metrics for Spark task output metrics, which will be shown in spark job UI.

## How was this patch tested?

test it manually. For a simple write job
```
spark.range(1000).write.parquet("/tmp/p1")
```
now the spark job UI looks like
![ui](https://user-images.githubusercontent.com/3182036/33326478-05a25b7c-d490-11e7-96ef-806117774356.jpg)

Author: Wenchen Fan <we...@databricks.com>

Closes #19833 from cloud-fan/ui.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20b23984
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20b23984
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20b23984

Branch: refs/heads/master
Commit: 20b239845b695fe6a893ebfe97b49ef05fae773d
Parents: e9b2070
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Nov 29 19:18:47 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Nov 29 19:18:47 2017 +0800

----------------------------------------------------------------------
 .../execution/datasources/BasicWriteStatsTracker.scala    | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20b23984/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 11af0aa..9dbbe99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -22,7 +22,7 @@ import java.io.FileNotFoundException
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.SQLExecution
@@ -44,7 +44,6 @@ case class BasicWriteTaskStats(
 
 /**
  * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]].
- * @param hadoopConf
  */
 class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
   extends WriteTaskStatsTracker with Logging {
@@ -106,6 +105,13 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
 
   override def getFinalStats(): WriteTaskStats = {
     statCurrentFile()
+
+    // Reports bytesWritten and recordsWritten to the Spark output metrics.
+    Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics =>
+      outputMetrics.setBytesWritten(numBytes)
+      outputMetrics.setRecordsWritten(numRows)
+    }
+
     if (submittedFiles != numFiles) {
       logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " +
         "This could be due to the output format not writing empty files, " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org