You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/07 05:12:26 UTC

spark git commit: [SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations

Repository: spark
Updated Branches:
  refs/heads/master c09b31eb8 -> 5df99bd36


[SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations

## What changes were proposed in this pull request?

Remove time metrics since it seems no way to measure it in non per-row tracking.

## How was this patch tested?

Existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #18558 from viirya/SPARK-20703-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5df99bd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5df99bd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5df99bd3

Branch: refs/heads/master
Commit: 5df99bd364561c6f4c02308149ba5eb71f89247e
Parents: c09b31e
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri Jul 7 13:12:20 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jul 7 13:12:20 2017 +0800

----------------------------------------------------------------------
 .../execution/command/DataWritingCommand.scala  | 10 ---------
 .../datasources/FileFormatWriter.scala          | 22 +++-----------------
 .../sql/hive/execution/SQLMetricsSuite.scala    |  3 ---
 3 files changed, 3 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 0c381a2..700f7f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -30,7 +30,6 @@ trait DataWritingCommand extends RunnableCommand {
   override lazy val metrics: Map[String, SQLMetric] = {
     val sparkContext = SparkContext.getActive.get
     Map(
-      "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"),
       "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
       "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
       "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
@@ -47,23 +46,14 @@ trait DataWritingCommand extends RunnableCommand {
     var numFiles = 0
     var totalNumBytes: Long = 0L
     var totalNumOutput: Long = 0L
-    var totalWritingTime: Long = 0L
 
     writeSummaries.foreach { summary =>
       numPartitions += summary.updatedPartitions.size
       numFiles += summary.numOutputFile
       totalNumBytes += summary.numOutputBytes
       totalNumOutput += summary.numOutputRows
-      totalWritingTime += summary.totalWritingTime
     }
 
-    val avgWritingTime = if (numFiles > 0) {
-      (totalWritingTime / numFiles).toLong
-    } else {
-      0L
-    }
-
-    metrics("avgTime").add(avgWritingTime)
     metrics("numFiles").add(numFiles)
     metrics("numOutputBytes").add(totalNumBytes)
     metrics("numOutputRows").add(totalNumOutput)

http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 6486663..9eb9eae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -275,8 +275,6 @@ object FileFormatWriter extends Logging {
     /**
      * The data structures used to measure metrics during writing.
      */
-    protected var totalWritingTime: Long = 0L
-    protected var timeOnCurrentFile: Long = 0L
     protected var numOutputRows: Long = 0L
     protected var numOutputBytes: Long = 0L
 
@@ -343,9 +341,7 @@ object FileFormatWriter extends Logging {
         }
 
         val internalRow = iter.next()
-        val startTime = System.nanoTime()
         currentWriter.write(internalRow)
-        timeOnCurrentFile += (System.nanoTime() - startTime)
         recordsInFile += 1
       }
       releaseResources()
@@ -355,17 +351,13 @@ object FileFormatWriter extends Logging {
         updatedPartitions = Set.empty,
         numOutputFile = fileCounter + 1,
         numOutputBytes = numOutputBytes,
-        numOutputRows = numOutputRows,
-        totalWritingTime = totalWritingTime)
+        numOutputRows = numOutputRows)
     }
 
     override def releaseResources(): Unit = {
       if (currentWriter != null) {
         try {
-          val startTime = System.nanoTime()
           currentWriter.close()
-          totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
-          timeOnCurrentFile = 0
           numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
         } finally {
           currentWriter = null
@@ -504,9 +496,7 @@ object FileFormatWriter extends Logging {
           releaseResources()
           newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions)
         }
-        val startTime = System.nanoTime()
         currentWriter.write(getOutputRow(row))
-        timeOnCurrentFile += (System.nanoTime() - startTime)
         recordsInFile += 1
       }
       if (currentPartColsAndBucketId != null) {
@@ -519,17 +509,13 @@ object FileFormatWriter extends Logging {
         updatedPartitions = updatedPartitions.toSet,
         numOutputFile = totalFileCounter,
         numOutputBytes = numOutputBytes,
-        numOutputRows = numOutputRows,
-        totalWritingTime = totalWritingTime)
+        numOutputRows = numOutputRows)
     }
 
     override def releaseResources(): Unit = {
       if (currentWriter != null) {
         try {
-          val startTime = System.nanoTime()
           currentWriter.close()
-          totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
-          timeOnCurrentFile = 0
           numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
         } finally {
           currentWriter = null
@@ -547,11 +533,9 @@ object FileFormatWriter extends Logging {
  * @param numOutputFile the total number of files.
  * @param numOutputRows the number of output rows.
  * @param numOutputBytes the bytes of output data.
- * @param totalWritingTime the total writing time in ms.
  */
 case class ExecutedWriteSummary(
   updatedPartitions: Set[String],
   numOutputFile: Int,
   numOutputRows: Long,
-  numOutputBytes: Long,
-  totalWritingTime: Long)
+  numOutputBytes: Long)

http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
index 1ef1988..24c0385 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
@@ -65,9 +65,6 @@ class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton {
     val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get
     val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt
     assert(totalNumBytes > 0)
-    val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get
-    val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt
-    assert(writingTime >= 0)
   }
 
   private def testMetricsNonDynamicPartition(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org