You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/01/19 04:22:42 UTC

spark git commit: [SPARK-12885][MINOR] Rename 3 fields in ShuffleWriteMetrics

Repository: spark
Updated Branches:
  refs/heads/master 323d51f1d -> 2b5d11f34


[SPARK-12885][MINOR] Rename 3 fields in ShuffleWriteMetrics

This is a small step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just renames 3 fields for consistency. Today we have:

```
inputMetrics.recordsRead
outputMetrics.bytesWritten
shuffleReadMetrics.localBlocksFetched
...
shuffleWriteMetrics.shuffleRecordsWritten
shuffleWriteMetrics.shuffleBytesWritten
shuffleWriteMetrics.shuffleWriteTime
```

The shuffle write ones are kind of redundant. We can drop the `shuffle` part in the method names. I added backward compatible (but deprecated) methods with the old names.

Parent PR: #10717

Author: Andrew Or <an...@databricks.com>

Closes #10811 from andrewor14/rename-things.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b5d11f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b5d11f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b5d11f3

Branch: refs/heads/master
Commit: 2b5d11f34d73eb7117c0c4668c1abb27dcc3a403
Parents: 323d51f
Author: Andrew Or <an...@databricks.com>
Authored: Mon Jan 18 19:22:29 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Jan 18 19:22:29 2016 -0800

----------------------------------------------------------------------
 .../sort/BypassMergeSortShuffleWriter.java      |  4 +-
 .../shuffle/sort/ShuffleExternalSorter.java     |  4 +-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java |  6 +--
 .../spark/storage/TimeTrackingOutputStream.java | 10 ++--
 .../spark/executor/ShuffleWriteMetrics.scala    | 37 +++++++++-----
 .../apache/spark/scheduler/SparkListener.scala  |  2 +-
 .../shuffle/FileShuffleBlockResolver.scala      |  2 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  2 +-
 .../spark/status/api/v1/AllStagesResource.scala | 12 ++---
 .../spark/storage/DiskBlockObjectWriter.scala   | 12 ++---
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |  2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  8 +--
 .../org/apache/spark/ui/jobs/StagePage.scala    | 14 ++---
 .../org/apache/spark/util/JsonProtocol.scala    | 13 ++---
 .../util/collection/ExternalAppendOnlyMap.scala |  4 +-
 .../spark/util/collection/ExternalSorter.scala  |  4 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  | 20 ++++----
 .../scala/org/apache/spark/ShuffleSuite.scala   |  4 +-
 .../spark/metrics/InputOutputMetricsSuite.scala |  2 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  2 +-
 .../BypassMergeSortShuffleWriterSuite.scala     |  8 +--
 .../storage/DiskBlockObjectWriterSuite.scala    | 54 ++++++++++----------
 .../ui/jobs/JobProgressListenerSuite.scala      |  2 +-
 .../apache/spark/util/JsonProtocolSuite.scala   | 12 ++---
 24 files changed, 126 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 56cdc22..a06dc1c 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -143,7 +143,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
     // Creating the file to write to and creating a disk writer both involve interacting with
     // the disk, and can take a long time in aggregate when we open many files, so should be
     // included in the shuffle write time.
-    writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
+    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
 
     while (records.hasNext()) {
       final Product2<K, V> record = records.next();
@@ -203,7 +203,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
       threwException = false;
     } finally {
       Closeables.close(out, threwException);
-      writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+      writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
     }
     partitionWriters = null;
     return lengths;

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 9affff8..2c84de5 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -233,8 +233,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
       // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
       // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
-      writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
-      taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
+      writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
+      taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 744c300..c8cc705 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -298,8 +298,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
         // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
         // to be counted as shuffle write, but this will lead to double-counting of the final
         // SpillInfo's bytes.
-        writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
-        writeMetrics.incShuffleBytesWritten(outputFile.length());
+        writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
+        writeMetrics.incBytesWritten(outputFile.length());
         return partitionLengths;
       }
     } catch (IOException e) {
@@ -411,7 +411,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
             spillInputChannelPositions[i] += actualBytesTransferred;
             bytesToTransfer -= actualBytesTransferred;
           }
-          writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
           bytesWrittenToMergedFile += partitionLengthInSpill;
           partitionLengths[partition] += partitionLengthInSpill;
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
index dc2aa30..5d0555a 100644
--- a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
+++ b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
@@ -42,34 +42,34 @@ public final class TimeTrackingOutputStream extends OutputStream {
   public void write(int b) throws IOException {
     final long startTime = System.nanoTime();
     outputStream.write(b);
-    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+    writeMetrics.incWriteTime(System.nanoTime() - startTime);
   }
 
   @Override
   public void write(byte[] b) throws IOException {
     final long startTime = System.nanoTime();
     outputStream.write(b);
-    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+    writeMetrics.incWriteTime(System.nanoTime() - startTime);
   }
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
     final long startTime = System.nanoTime();
     outputStream.write(b, off, len);
-    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+    writeMetrics.incWriteTime(System.nanoTime() - startTime);
   }
 
   @Override
   public void flush() throws IOException {
     final long startTime = System.nanoTime();
     outputStream.flush();
-    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+    writeMetrics.incWriteTime(System.nanoTime() - startTime);
   }
 
   @Override
   public void close() throws IOException {
     final long startTime = System.nanoTime();
     outputStream.close();
-    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+    writeMetrics.incWriteTime(System.nanoTime() - startTime);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 469ebe2..24795f8 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -26,28 +26,39 @@ import org.apache.spark.annotation.DeveloperApi
  */
 @DeveloperApi
 class ShuffleWriteMetrics extends Serializable {
+
   /**
    * Number of bytes written for the shuffle by this task
    */
-  @volatile private var _shuffleBytesWritten: Long = _
-  def shuffleBytesWritten: Long = _shuffleBytesWritten
-  private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
-  private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
+  @volatile private var _bytesWritten: Long = _
+  def bytesWritten: Long = _bytesWritten
+  private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
+  private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value
 
   /**
    * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
    */
-  @volatile private var _shuffleWriteTime: Long = _
-  def shuffleWriteTime: Long = _shuffleWriteTime
-  private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
-  private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
+  @volatile private var _writeTime: Long = _
+  def writeTime: Long = _writeTime
+  private[spark] def incWriteTime(value: Long) = _writeTime += value
+  private[spark] def decWriteTime(value: Long) = _writeTime -= value
 
   /**
    * Total number of records written to the shuffle by this task
    */
-  @volatile private var _shuffleRecordsWritten: Long = _
-  def shuffleRecordsWritten: Long = _shuffleRecordsWritten
-  private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
-  private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
-  private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
+  @volatile private var _recordsWritten: Long = _
+  def recordsWritten: Long = _recordsWritten
+  private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
+  private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
+  private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
+
+  // Legacy methods for backward compatibility.
+  // TODO: remove these once we make this class private.
+  @deprecated("use bytesWritten instead", "2.0.0")
+  def shuffleBytesWritten: Long = bytesWritten
+  @deprecated("use writeTime instead", "2.0.0")
+  def shuffleWriteTime: Long = writeTime
+  @deprecated("use recordsWritten instead", "2.0.0")
+  def shuffleRecordsWritten: Long = recordsWritten
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3130a65..f5267f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -271,7 +271,7 @@ class StatsReportListener extends SparkListener with Logging {
 
     // Shuffle write
     showBytesDistribution("shuffle bytes written:",
-      (_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
+      (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
 
     // Fetch & I/O
     showMillisDistribution("fetch wait time:",

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 294e16c..2970968 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -90,7 +90,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
       }
       // Creating the file to write to and creating a disk writer both involve interacting with
       // the disk, so should be included in the shuffle write time.
-      writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
+      writeMetrics.incWriteTime(System.nanoTime - openStartTime)
 
       override def releaseWriters(success: Boolean) {
         shuffleState.completedMapTasks.add(mapId)

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index f83cf88..5c5a5f5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -94,7 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
         val startTime = System.nanoTime()
         sorter.stop()
         context.taskMetrics.shuffleWriteMetrics.foreach(
-          _.incShuffleWriteTime(System.nanoTime - startTime))
+          _.incWriteTime(System.nanoTime - startTime))
         sorter = null
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 341ae78..078718b 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -214,9 +214,9 @@ private[v1] object AllStagesResource {
           raw.shuffleWriteMetrics
         }
         def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
-          writeBytes = submetricQuantiles(_.shuffleBytesWritten),
-          writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
-          writeTime = submetricQuantiles(_.shuffleWriteTime)
+          writeBytes = submetricQuantiles(_.bytesWritten),
+          writeRecords = submetricQuantiles(_.recordsWritten),
+          writeTime = submetricQuantiles(_.writeTime)
         )
       }.metricOption
 
@@ -283,9 +283,9 @@ private[v1] object AllStagesResource {
 
   def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
     new ShuffleWriteMetrics(
-      bytesWritten = internal.shuffleBytesWritten,
-      writeTime = internal.shuffleWriteTime,
-      recordsWritten = internal.shuffleRecordsWritten
+      bytesWritten = internal.bytesWritten,
+      writeTime = internal.writeTime,
+      recordsWritten = internal.recordsWritten
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index e36a367..c34d49c 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -102,7 +102,7 @@ private[spark] class DiskBlockObjectWriter(
           objOut.flush()
           val start = System.nanoTime()
           fos.getFD.sync()
-          writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
+          writeMetrics.incWriteTime(System.nanoTime() - start)
         }
       } {
         objOut.close()
@@ -132,7 +132,7 @@ private[spark] class DiskBlockObjectWriter(
       close()
       finalPosition = file.length()
       // In certain compression codecs, more bytes are written after close() is called
-      writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
+      writeMetrics.incBytesWritten(finalPosition - reportedPosition)
     } else {
       finalPosition = file.length()
     }
@@ -152,8 +152,8 @@ private[spark] class DiskBlockObjectWriter(
     // truncating the file to its initial position.
     try {
       if (initialized) {
-        writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
-        writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
+        writeMetrics.decBytesWritten(reportedPosition - initialPosition)
+        writeMetrics.decRecordsWritten(numRecordsWritten)
         objOut.flush()
         bs.flush()
         close()
@@ -201,7 +201,7 @@ private[spark] class DiskBlockObjectWriter(
    */
   def recordWritten(): Unit = {
     numRecordsWritten += 1
-    writeMetrics.incShuffleRecordsWritten(1)
+    writeMetrics.incRecordsWritten(1)
 
     if (numRecordsWritten % 32 == 0) {
       updateBytesWritten()
@@ -226,7 +226,7 @@ private[spark] class DiskBlockObjectWriter(
    */
   private def updateBytesWritten() {
     val pos = channel.position()
-    writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
+    writeMetrics.incBytesWritten(pos - reportedPosition)
     reportedPosition = pos
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 2d955a6..160d7a4 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -129,7 +129,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
         }
         metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
           executorToShuffleWrite(eid) =
-            executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
+            executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index ca37829..4a9f8b3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -426,14 +426,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
     val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)
 
     val shuffleWriteDelta =
-      (taskMetrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
-      - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
+      (taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L)
+      - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L))
     stageData.shuffleWriteBytes += shuffleWriteDelta
     execSummary.shuffleWrite += shuffleWriteDelta
 
     val shuffleWriteRecordsDelta =
-      (taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L)
-      - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L))
+      (taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L)
+      - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L))
     stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
     execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 6d4066a..914f618 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -500,11 +500,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
             getFormattedSizeQuantiles(shuffleReadRemoteSizes)
 
           val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
-            metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+            metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
           }
 
           val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
-            metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
+            metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
           }
 
           val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
@@ -619,7 +619,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
         val shuffleReadTimeProportion = toProportion(shuffleReadTime)
         val shuffleWriteTime =
           (metricsOpt.flatMap(_.shuffleWriteMetrics
-            .map(_.shuffleWriteTime)).getOrElse(0L) / 1e6).toLong
+            .map(_.writeTime)).getOrElse(0L) / 1e6).toLong
         val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)
 
         val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
@@ -930,13 +930,13 @@ private[ui] class TaskDataSource(
     val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
 
     val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
-    val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L)
+    val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
     val shuffleWriteReadable = maybeShuffleWrite
-      .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+      .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
     val shuffleWriteRecords = maybeShuffleWrite
-      .map(_.shuffleRecordsWritten.toString).getOrElse("")
+      .map(_.recordsWritten.toString).getOrElse("")
 
-    val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
+    val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime)
     val writeTimeSortable = maybeWriteTime.getOrElse(0L)
     val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
       if (ms == 0) "" else UIUtils.formatDuration(ms)

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a6460bc..b88221a 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -331,10 +331,11 @@ private[spark] object JsonProtocol {
     ("Total Records Read" -> shuffleReadMetrics.recordsRead)
   }
 
+  // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
   def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
-    ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
-    ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~
-    ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten)
+    ("Shuffle Bytes Written" -> shuffleWriteMetrics.bytesWritten) ~
+    ("Shuffle Write Time" -> shuffleWriteMetrics.writeTime) ~
+    ("Shuffle Records Written" -> shuffleWriteMetrics.recordsWritten)
   }
 
   def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
@@ -752,9 +753,9 @@ private[spark] object JsonProtocol {
 
   def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
     val metrics = new ShuffleWriteMetrics
-    metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
-    metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
-    metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written")
+    metrics.incBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
+    metrics.incWriteTime((json \ "Shuffle Write Time").extract[Long])
+    metrics.setRecordsWritten((json \ "Shuffle Records Written")
       .extractOpt[Long].getOrElse(0))
     metrics
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 4a44481..ff9dad7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -193,8 +193,8 @@ class ExternalAppendOnlyMap[K, V, C](
       val w = writer
       writer = null
       w.commitAndClose()
-      _diskBytesSpilled += curWriteMetrics.shuffleBytesWritten
-      batchSizes.append(curWriteMetrics.shuffleBytesWritten)
+      _diskBytesSpilled += curWriteMetrics.bytesWritten
+      batchSizes.append(curWriteMetrics.bytesWritten)
       objectsWritten = 0
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 63ba954..4c7416e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -262,8 +262,8 @@ private[spark] class ExternalSorter[K, V, C](
       val w = writer
       writer = null
       w.commitAndClose()
-      _diskBytesSpilled += spillMetrics.shuffleBytesWritten
-      batchSizes.append(spillMetrics.shuffleBytesWritten)
+      _diskBytesSpilled += spillMetrics.bytesWritten
+      batchSizes.append(spillMetrics.bytesWritten)
       spillMetrics = null
       objectsWritten = 0
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 5fe64bd..625fdd5 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -279,8 +279,8 @@ public class UnsafeShuffleWriterSuite {
     assertTrue(mapStatus.isDefined());
     assertTrue(mergedOutputFile.exists());
     assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
-    assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten());
-    assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleBytesWritten());
+    assertEquals(0, taskMetrics.shuffleWriteMetrics().get().recordsWritten());
+    assertEquals(0, taskMetrics.shuffleWriteMetrics().get().bytesWritten());
     assertEquals(0, taskMetrics.diskBytesSpilled());
     assertEquals(0, taskMetrics.memoryBytesSpilled());
   }
@@ -311,10 +311,10 @@ public class UnsafeShuffleWriterSuite {
       HashMultiset.create(readRecordsFromFile()));
     assertSpillFilesWereCleanedUp();
     ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
-    assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+    assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
     assertEquals(0, taskMetrics.diskBytesSpilled());
     assertEquals(0, taskMetrics.memoryBytesSpilled());
-    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
   }
 
   private void testMergingSpills(
@@ -354,11 +354,11 @@ public class UnsafeShuffleWriterSuite {
     assertEquals(HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile()));
     assertSpillFilesWereCleanedUp();
     ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
-    assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+    assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
     assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
     assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
     assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
-    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
   }
 
   @Test
@@ -416,11 +416,11 @@ public class UnsafeShuffleWriterSuite {
     readRecordsFromFile();
     assertSpillFilesWereCleanedUp();
     ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
-    assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+    assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
     assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
     assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
     assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
-    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
   }
 
   @Test
@@ -437,11 +437,11 @@ public class UnsafeShuffleWriterSuite {
     readRecordsFromFile();
     assertSpillFilesWereCleanedUp();
     ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
-    assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
+    assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
     assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
     assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
     assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
-    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten());
+    assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index c45d814..6ffa1c8 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -450,8 +450,8 @@ object ShuffleSuite {
     val listener = new SparkListener {
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
         taskEnd.taskMetrics.shuffleWriteMetrics.foreach { m =>
-          recordsWritten += m.shuffleRecordsWritten
-          bytesWritten += m.shuffleBytesWritten
+          recordsWritten += m.recordsWritten
+          bytesWritten += m.bytesWritten
         }
         taskEnd.taskMetrics.shuffleReadMetrics.foreach { m =>
           recordsRead += m.recordsRead

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index aaf62e0..e5a4482 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -212,7 +212,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
         metrics.inputMetrics.foreach(inputRead += _.recordsRead)
         metrics.outputMetrics.foreach(outputWritten += _.recordsWritten)
         metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead)
-        metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten)
+        metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten)
       }
     })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index dc15f59..c87158d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -269,7 +269,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
           taskMetrics.inputMetrics should not be ('defined)
           taskMetrics.outputMetrics should not be ('defined)
           taskMetrics.shuffleWriteMetrics should be ('defined)
-          taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0L)
+          taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L)
         }
         if (stageInfo.rddInfos.exists(_.name == d4.name)) {
           taskMetrics.shuffleReadMetrics should be ('defined)

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index ef6ce04..fdacd8c 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -145,8 +145,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
     assert(outputFile.length() === 0)
     assert(temporaryFilesCreated.isEmpty)
     val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
-    assert(shuffleWriteMetrics.shuffleBytesWritten === 0)
-    assert(shuffleWriteMetrics.shuffleRecordsWritten === 0)
+    assert(shuffleWriteMetrics.bytesWritten === 0)
+    assert(shuffleWriteMetrics.recordsWritten === 0)
     assert(taskMetrics.diskBytesSpilled === 0)
     assert(taskMetrics.memoryBytesSpilled === 0)
   }
@@ -169,8 +169,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
     assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files
     assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
     val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
-    assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
-    assert(shuffleWriteMetrics.shuffleRecordsWritten === records.length)
+    assert(shuffleWriteMetrics.bytesWritten === outputFile.length())
+    assert(shuffleWriteMetrics.recordsWritten === records.length)
     assert(taskMetrics.diskBytesSpilled === 0)
     assert(taskMetrics.memoryBytesSpilled === 0)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 5d36617..8eff3c2 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -50,18 +50,18 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
 
     writer.write(Long.box(20), Long.box(30))
     // Record metrics update on every write
-    assert(writeMetrics.shuffleRecordsWritten === 1)
+    assert(writeMetrics.recordsWritten === 1)
     // Metrics don't update on every write
-    assert(writeMetrics.shuffleBytesWritten == 0)
+    assert(writeMetrics.bytesWritten == 0)
     // After 32 writes, metrics should update
     for (i <- 0 until 32) {
       writer.flush()
       writer.write(Long.box(i), Long.box(i))
     }
-    assert(writeMetrics.shuffleBytesWritten > 0)
-    assert(writeMetrics.shuffleRecordsWritten === 33)
+    assert(writeMetrics.bytesWritten > 0)
+    assert(writeMetrics.recordsWritten === 33)
     writer.commitAndClose()
-    assert(file.length() == writeMetrics.shuffleBytesWritten)
+    assert(file.length() == writeMetrics.bytesWritten)
   }
 
   test("verify write metrics on revert") {
@@ -72,19 +72,19 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
 
     writer.write(Long.box(20), Long.box(30))
     // Record metrics update on every write
-    assert(writeMetrics.shuffleRecordsWritten === 1)
+    assert(writeMetrics.recordsWritten === 1)
     // Metrics don't update on every write
-    assert(writeMetrics.shuffleBytesWritten == 0)
+    assert(writeMetrics.bytesWritten == 0)
     // After 32 writes, metrics should update
     for (i <- 0 until 32) {
       writer.flush()
       writer.write(Long.box(i), Long.box(i))
     }
-    assert(writeMetrics.shuffleBytesWritten > 0)
-    assert(writeMetrics.shuffleRecordsWritten === 33)
+    assert(writeMetrics.bytesWritten > 0)
+    assert(writeMetrics.recordsWritten === 33)
     writer.revertPartialWritesAndClose()
-    assert(writeMetrics.shuffleBytesWritten == 0)
-    assert(writeMetrics.shuffleRecordsWritten == 0)
+    assert(writeMetrics.bytesWritten == 0)
+    assert(writeMetrics.recordsWritten == 0)
   }
 
   test("Reopening a closed block writer") {
@@ -109,11 +109,11 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
       writer.write(i, i)
     }
     writer.commitAndClose()
-    val bytesWritten = writeMetrics.shuffleBytesWritten
-    assert(writeMetrics.shuffleRecordsWritten === 1000)
+    val bytesWritten = writeMetrics.bytesWritten
+    assert(writeMetrics.recordsWritten === 1000)
     writer.revertPartialWritesAndClose()
-    assert(writeMetrics.shuffleRecordsWritten === 1000)
-    assert(writeMetrics.shuffleBytesWritten === bytesWritten)
+    assert(writeMetrics.recordsWritten === 1000)
+    assert(writeMetrics.bytesWritten === bytesWritten)
   }
 
   test("commitAndClose() should be idempotent") {
@@ -125,13 +125,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
       writer.write(i, i)
     }
     writer.commitAndClose()
-    val bytesWritten = writeMetrics.shuffleBytesWritten
-    val writeTime = writeMetrics.shuffleWriteTime
-    assert(writeMetrics.shuffleRecordsWritten === 1000)
+    val bytesWritten = writeMetrics.bytesWritten
+    val writeTime = writeMetrics.writeTime
+    assert(writeMetrics.recordsWritten === 1000)
     writer.commitAndClose()
-    assert(writeMetrics.shuffleRecordsWritten === 1000)
-    assert(writeMetrics.shuffleBytesWritten === bytesWritten)
-    assert(writeMetrics.shuffleWriteTime === writeTime)
+    assert(writeMetrics.recordsWritten === 1000)
+    assert(writeMetrics.bytesWritten === bytesWritten)
+    assert(writeMetrics.writeTime === writeTime)
   }
 
   test("revertPartialWritesAndClose() should be idempotent") {
@@ -143,13 +143,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
       writer.write(i, i)
     }
     writer.revertPartialWritesAndClose()
-    val bytesWritten = writeMetrics.shuffleBytesWritten
-    val writeTime = writeMetrics.shuffleWriteTime
-    assert(writeMetrics.shuffleRecordsWritten === 0)
+    val bytesWritten = writeMetrics.bytesWritten
+    val writeTime = writeMetrics.writeTime
+    assert(writeMetrics.recordsWritten === 0)
     writer.revertPartialWritesAndClose()
-    assert(writeMetrics.shuffleRecordsWritten === 0)
-    assert(writeMetrics.shuffleBytesWritten === bytesWritten)
-    assert(writeMetrics.shuffleWriteTime === writeTime)
+    assert(writeMetrics.recordsWritten === 0)
+    assert(writeMetrics.bytesWritten === bytesWritten)
+    assert(writeMetrics.writeTime === writeTime)
   }
 
   test("fileSegment() can only be called after commitAndClose() has been called") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e02f5a1..ee2d56a 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -277,7 +277,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
       shuffleReadMetrics.incRemoteBytesRead(base + 1)
       shuffleReadMetrics.incLocalBytesRead(base + 9)
       shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
-      shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
+      shuffleWriteMetrics.incBytesWritten(base + 3)
       taskMetrics.setExecutorRunTime(base + 4)
       taskMetrics.incDiskBytesSpilled(base + 5)
       taskMetrics.incMemoryBytesSpilled(base + 6)

http://git-wip-us.apache.org/repos/asf/spark/blob/2b5d11f3/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 068e839..9dd400f 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -227,7 +227,7 @@ class JsonProtocolSuite extends SparkFunSuite {
                          .removeField { case (field, _) => field == "Shuffle Records Written" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
     assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
-    assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0)
+    assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0)
   }
 
   test("OutputMetrics backward compatibility") {
@@ -568,8 +568,8 @@ class JsonProtocolSuite extends SparkFunSuite {
   }
 
   private def assertEquals(metrics1: ShuffleWriteMetrics, metrics2: ShuffleWriteMetrics) {
-    assert(metrics1.shuffleBytesWritten === metrics2.shuffleBytesWritten)
-    assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
+    assert(metrics1.bytesWritten === metrics2.bytesWritten)
+    assert(metrics1.writeTime === metrics2.writeTime)
   }
 
   private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
@@ -794,9 +794,9 @@ class JsonProtocolSuite extends SparkFunSuite {
       t.outputMetrics = Some(outputMetrics)
     } else {
       val sw = new ShuffleWriteMetrics
-      sw.incShuffleBytesWritten(a + b + c)
-      sw.incShuffleWriteTime(b + c + d)
-      sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
+      sw.incBytesWritten(a + b + c)
+      sw.incWriteTime(b + c + d)
+      sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
       t.shuffleWriteMetrics = Some(sw)
     }
     // Make at most 6 blocks


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org