You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/01/19 10:32:37 UTC

spark git commit: [SPARK-3288] All fields in TaskMetrics should be private and use getters/setters

Repository: spark
Updated Branches:
  refs/heads/master 851b6a9bb -> 3453d578a


[SPARK-3288] All fields in TaskMetrics should be private and use getters/setters

I've updated the fields and all usages of these fields in the Spark code. I've verified that this did not break anything on my local repo.

Author: Ilya Ganelin <il...@capitalone.com>

Closes #4020 from ilganeli/SPARK-3288 and squashes the following commits:

39f3810 [Ilya Ganelin] resolved merge issues
e446287 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3288
b8c05cb [Ilya Ganelin] Missed making a variable private
6444391 [Ilya Ganelin] Made inc/dec functions private[spark]
1149e78 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3288
26b312b [Ilya Ganelin] Debugging tests
17146c2 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3288
5525c20 [Ilya Ganelin] Completed refactoring to make vars in TaskMetrics class private
c64da4f [Ilya Ganelin] Partially updated task metrics to make some vars private


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3453d578
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3453d578
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3453d578

Branch: refs/heads/master
Commit: 3453d578ad9933be6881488c8ca3611e5b686af9
Parents: 851b6a9
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Mon Jan 19 01:32:22 2015 -0800
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Mon Jan 19 01:32:36 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |   8 +-
 .../org/apache/spark/api/python/PythonRDD.scala |   4 +-
 .../org/apache/spark/executor/Executor.scala    |  19 ++--
 .../org/apache/spark/executor/TaskMetrics.scala | 103 +++++++++++++------
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |   4 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   1 +
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   1 +
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   6 +-
 .../scala/org/apache/spark/scheduler/Task.scala |   2 +-
 .../spark/scheduler/TaskResultGetter.scala      |   2 +-
 .../spark/shuffle/hash/HashShuffleReader.scala  |   4 +-
 .../spark/storage/BlockObjectWriter.scala       |   8 +-
 .../storage/ShuffleBlockFetcherIterator.scala   |   8 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  30 +++---
 .../spark/util/collection/ExternalSorter.scala  |   8 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  16 +--
 .../apache/spark/util/JsonProtocolSuite.scala   |  28 ++---
 17 files changed, 149 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 09eb960..3b684bb 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
       // Update task metrics if context is not null
       // TODO: Make context non optional in a future release
       Option(context).foreach { c =>
-        c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
-        c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
+        c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
+        c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
       }
       combiners.iterator
     }
@@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
       // Update task metrics if context is not null
       // TODO: Make context non-optional in a future release
       Option(context).foreach { c =>
-        c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
-        c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
+        c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
+        c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
       }
       combiners.iterator
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index bad40e6..4ac666c 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -125,8 +125,8 @@ private[spark] class PythonRDD(
                 init, finish))
               val memoryBytesSpilled = stream.readLong()
               val diskBytesSpilled = stream.readLong()
-              context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
-              context.taskMetrics.diskBytesSpilled += diskBytesSpilled
+              context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
+              context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
               read()
             case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
               // Signals that an exception has been thrown in python

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6660b98..42566d1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -203,10 +203,10 @@ private[spark] class Executor(
         val afterSerialization = System.currentTimeMillis()
 
         for (m <- task.metrics) {
-          m.executorDeserializeTime = taskStart - deserializeStartTime
-          m.executorRunTime = taskFinish - taskStart
-          m.jvmGCTime = gcTime - startGCTime
-          m.resultSerializationTime = afterSerialization - beforeSerialization
+          m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
+          m.setExecutorRunTime(taskFinish - taskStart)
+          m.setJvmGCTime(gcTime - startGCTime)
+          m.setResultSerializationTime(afterSerialization - beforeSerialization)
         }
 
         val accumUpdates = Accumulators.values
@@ -257,8 +257,8 @@ private[spark] class Executor(
           val serviceTime = System.currentTimeMillis() - taskStart
           val metrics = attemptedTask.flatMap(t => t.metrics)
           for (m <- metrics) {
-            m.executorRunTime = serviceTime
-            m.jvmGCTime = gcTime - startGCTime
+            m.setExecutorRunTime(serviceTime)
+            m.setJvmGCTime(gcTime - startGCTime)
           }
           val reason = new ExceptionFailure(t, metrics)
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
@@ -376,11 +376,12 @@ private[spark] class Executor(
           val curGCTime = gcTime
 
           for (taskRunner <- runningTasks.values()) {
-            if (!taskRunner.attemptedTask.isEmpty) {
+            if (taskRunner.attemptedTask.nonEmpty) {
               Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
-                metrics.updateShuffleReadMetrics
+                metrics.updateShuffleReadMetrics()
                 metrics.updateInputMetrics()
-                metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
+                metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
+
                 if (isLocal) {
                   // JobProgressListener will hold an reference of it during
                   // onExecutorMetricsUpdate(), then JobProgressListener can not see

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 7eb10f9..ddb5903 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -44,42 +44,62 @@ class TaskMetrics extends Serializable {
   /**
    * Host's name the task runs on
    */
-  var hostname: String = _
-
+  private var _hostname: String = _
+  def hostname = _hostname
+  private[spark] def setHostname(value: String) = _hostname = value
+  
   /**
    * Time taken on the executor to deserialize this task
    */
-  var executorDeserializeTime: Long = _
-
+  private var _executorDeserializeTime: Long = _
+  def executorDeserializeTime = _executorDeserializeTime
+  private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value
+  
+  
   /**
    * Time the executor spends actually running the task (including fetching shuffle data)
    */
-  var executorRunTime: Long = _
-
+  private var _executorRunTime: Long = _
+  def executorRunTime = _executorRunTime
+  private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value
+  
   /**
    * The number of bytes this task transmitted back to the driver as the TaskResult
    */
-  var resultSize: Long = _
+  private var _resultSize: Long = _
+  def resultSize = _resultSize
+  private[spark] def setResultSize(value: Long) = _resultSize = value
+
 
   /**
    * Amount of time the JVM spent in garbage collection while executing this task
    */
-  var jvmGCTime: Long = _
+  private var _jvmGCTime: Long = _
+  def jvmGCTime = _jvmGCTime
+  private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value
 
   /**
    * Amount of time spent serializing the task result
    */
-  var resultSerializationTime: Long = _
+  private var _resultSerializationTime: Long = _
+  def resultSerializationTime = _resultSerializationTime
+  private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value
 
   /**
    * The number of in-memory bytes spilled by this task
    */
-  var memoryBytesSpilled: Long = _
+  private var _memoryBytesSpilled: Long = _
+  def memoryBytesSpilled = _memoryBytesSpilled
+  private[spark] def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value
+  private[spark] def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value
 
   /**
    * The number of on-disk bytes spilled by this task
    */
-  var diskBytesSpilled: Long = _
+  private var _diskBytesSpilled: Long = _
+  def diskBytesSpilled = _diskBytesSpilled
+  def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value
+  def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value
 
   /**
    * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
@@ -178,10 +198,10 @@ class TaskMetrics extends Serializable {
   private[spark] def updateShuffleReadMetrics() = synchronized {
     val merged = new ShuffleReadMetrics()
     for (depMetrics <- depsShuffleReadMetrics) {
-      merged.fetchWaitTime += depMetrics.fetchWaitTime
-      merged.localBlocksFetched += depMetrics.localBlocksFetched
-      merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
-      merged.remoteBytesRead += depMetrics.remoteBytesRead
+      merged.incFetchWaitTime(depMetrics.fetchWaitTime)
+      merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
+      merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
+      merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
     }
     _shuffleReadMetrics = Some(merged)
   }
@@ -265,7 +285,9 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
   /**
    * Total bytes written
    */
-  var bytesWritten: Long = 0L
+  private var _bytesWritten: Long = _
+  def bytesWritten = _bytesWritten
+  private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
 }
 
 /**
@@ -275,31 +297,44 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
 @DeveloperApi
 class ShuffleReadMetrics extends Serializable {
   /**
-   * Number of blocks fetched in this shuffle by this task (remote or local)
-   */
-  def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
-
-  /**
    * Number of remote blocks fetched in this shuffle by this task
    */
-  var remoteBlocksFetched: Int = _
-
+  private var _remoteBlocksFetched: Int = _
+  def remoteBlocksFetched = _remoteBlocksFetched
+  private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
+  private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+  
   /**
    * Number of local blocks fetched in this shuffle by this task
    */
-  var localBlocksFetched: Int = _
+  private var _localBlocksFetched: Int = _
+  def localBlocksFetched = _localBlocksFetched
+  private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
+  private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
+
 
   /**
    * Time the task spent waiting for remote shuffle blocks. This only includes the time
    * blocking on shuffle input data. For instance if block B is being fetched while the task is
    * still not finished processing block A, it is not considered to be blocking on block B.
    */
-  var fetchWaitTime: Long = _
-
+  private var _fetchWaitTime: Long = _
+  def fetchWaitTime = _fetchWaitTime
+  private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
+  private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+  
   /**
    * Total number of remote bytes read from the shuffle by this task
    */
-  var remoteBytesRead: Long = _
+  private var _remoteBytesRead: Long = _
+  def remoteBytesRead = _remoteBytesRead
+  private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
+  private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
+
+  /**
+   * Number of blocks fetched in this shuffle by this task (remote or local)
+   */
+  def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
 }
 
 /**
@@ -311,10 +346,18 @@ class ShuffleWriteMetrics extends Serializable {
   /**
    * Number of bytes written for the shuffle by this task
    */
-  @volatile var shuffleBytesWritten: Long = _
-
+  @volatile private var _shuffleBytesWritten: Long = _
+  def shuffleBytesWritten = _shuffleBytesWritten
+  private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
+  private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
+  
   /**
    * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
    */
-  @volatile var shuffleWriteTime: Long = _
+  @volatile private var _shuffleWriteTime: Long = _
+  def shuffleWriteTime= _shuffleWriteTime
+  private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
+  private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
+  
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 70edf19..07398a6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -159,8 +159,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       for ((it, depNum) <- rddIterators) {
         map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
       }
-      context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled
-      context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled
+      context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
+      context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
       new InterruptibleIterator(context,
         map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3b99d3a..056aef0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -245,6 +245,7 @@ class HadoopRDD[K, V](
           case eof: EOFException =>
             finished = true
         }
+
         (key, value)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 890ec67..7b0e3c8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -154,6 +154,7 @@ class NewHadoopRDD[K, V](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
+
         (reader.getCurrentKey, reader.getCurrentValue)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index e43e506..0f37d83 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1007,7 +1007,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
         writer.close(hadoopContext)
       }
       committer.commitTask(hadoopContext)
-      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+      bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
       1
     } : Int
 
@@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
         writer.close()
       }
       writer.commit()
-      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+      bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
     }
 
     self.context.runJob(self, writeToFile)
@@ -1102,7 +1102,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
     if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
         && bytesWrittenCallback.isDefined) {
-      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+      bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 2367f7e..847a491 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -55,7 +55,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
     context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
       taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
     TaskContextHelper.setTaskContext(context)
-    context.taskMetrics.hostname = Utils.localHostName()
+    context.taskMetrics.setHostname(Utils.localHostName())
     taskThread = Thread.currentThread()
     if (_killed) {
       kill(interruptThread = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 4896ec8..774f3d8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
               (deserializedResult, size)
           }
 
-          result.metrics.resultSize = size
+          result.metrics.setResultSize(size)
           scheduler.handleSuccessfulTask(taskSetManager, tid, result)
         } catch {
           case cnf: ClassNotFoundException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index de72148..41bafab 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -59,8 +59,8 @@ private[spark] class HashShuffleReader[K, C](
         // the ExternalSorter won't spill to disk.
         val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
         sorter.insertAll(aggregatedIter)
-        context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
-        context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
+        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
         sorter.iterator
       case None =>
         aggregatedIter

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 9c46937..3198d76 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -160,14 +160,14 @@ private[spark] class DiskBlockObjectWriter(
     }
     finalPosition = file.length()
     // In certain compression codecs, more bytes are written after close() is called
-    writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
+    writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
   }
 
   // Discard current writes. We do this by flushing the outstanding writes and then
   // truncating the file to its initial position.
   override def revertPartialWritesAndClose() {
     try {
-      writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)
+      writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
 
       if (initialized) {
         objOut.flush()
@@ -212,14 +212,14 @@ private[spark] class DiskBlockObjectWriter(
    */
   private def updateBytesWritten() {
     val pos = channel.position()
-    writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
+    writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
     reportedPosition = pos
   }
 
   private def callWithTiming(f: => Unit) = {
     val start = System.nanoTime()
     f
-    writeMetrics.shuffleWriteTime += (System.nanoTime() - start)
+    writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
   }
 
   // For testing

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 2499c11..ab9ee4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -156,8 +156,8 @@ final class ShuffleBlockFetcherIterator(
             // This needs to be released after use.
             buf.retain()
             results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
-            shuffleMetrics.remoteBytesRead += buf.size
-            shuffleMetrics.remoteBlocksFetched += 1
+            shuffleMetrics.incRemoteBytesRead(buf.size)
+            shuffleMetrics.incRemoteBlocksFetched(1)
           }
           logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
         }
@@ -233,7 +233,7 @@ final class ShuffleBlockFetcherIterator(
       val blockId = iter.next()
       try {
         val buf = blockManager.getBlockData(blockId)
-        shuffleMetrics.localBlocksFetched += 1
+        shuffleMetrics.incLocalBlocksFetched(1)
         buf.retain()
         results.put(new SuccessFetchResult(blockId, 0, buf))
       } catch {
@@ -277,7 +277,7 @@ final class ShuffleBlockFetcherIterator(
     currentResult = results.take()
     val result = currentResult
     val stopFetchWait = System.currentTimeMillis()
-    shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
+    shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
 
     result match {
       case SuccessFetchResult(_, size, _) => bytesInFlight -= size

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 76709a2..f896b50 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -632,14 +632,14 @@ private[spark] object JsonProtocol {
       return TaskMetrics.empty
     }
     val metrics = new TaskMetrics
-    metrics.hostname = (json \ "Host Name").extract[String]
-    metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
-    metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
-    metrics.resultSize = (json \ "Result Size").extract[Long]
-    metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
-    metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
-    metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
-    metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
+    metrics.setHostname((json \ "Host Name").extract[String])
+    metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
+    metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
+    metrics.setResultSize((json \ "Result Size").extract[Long])
+    metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
+    metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])
+    metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
+    metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
     metrics.setShuffleReadMetrics(
       Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
     metrics.shuffleWriteMetrics =
@@ -661,17 +661,17 @@ private[spark] object JsonProtocol {
 
   def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
     val metrics = new ShuffleReadMetrics
-    metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
-    metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
-    metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
-    metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
+    metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int])
+    metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
+    metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
+    metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
     metrics
   }
 
   def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
     val metrics = new ShuffleWriteMetrics
-    metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
-    metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
+    metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
+    metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
     metrics
   }
 
@@ -685,7 +685,7 @@ private[spark] object JsonProtocol {
   def outputMetricsFromJson(json: JValue): OutputMetrics = {
     val metrics = new OutputMetrics(
       DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
-    metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
+    metrics.setBytesWritten((json \ "Bytes Written").extract[Long])
     metrics
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 15bda1c..6ba0384 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -757,12 +757,12 @@ private[spark] class ExternalSorter[K, V, C](
       }
     }
 
-    context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
-    context.taskMetrics.diskBytesSpilled += diskBytesSpilled
+    context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
+    context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
     context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
       if (curWriteMetrics != null) {
-        m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten
-        m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime
+        m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
+        m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index c9417ea..68074ae 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -140,7 +140,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(listener.stageIdToData.size === 0)
 
     // finish this task, should get updated shuffleRead
-    shuffleReadMetrics.remoteBytesRead = 1000
+    shuffleReadMetrics.incRemoteBytesRead(1000)
     taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
     var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
@@ -226,18 +226,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       val shuffleWriteMetrics = new ShuffleWriteMetrics()
       taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
       taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
-      shuffleReadMetrics.remoteBytesRead = base + 1
-      shuffleReadMetrics.remoteBlocksFetched = base + 2
-      shuffleWriteMetrics.shuffleBytesWritten = base + 3
-      taskMetrics.executorRunTime = base + 4
-      taskMetrics.diskBytesSpilled = base + 5
-      taskMetrics.memoryBytesSpilled = base + 6
+      shuffleReadMetrics.incRemoteBytesRead(base + 1)
+      shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
+      shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
+      taskMetrics.setExecutorRunTime(base + 4)
+      taskMetrics.incDiskBytesSpilled(base + 5)
+      taskMetrics.incMemoryBytesSpilled(base + 6)
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
       taskMetrics.setInputMetrics(Some(inputMetrics))
       inputMetrics.addBytesRead(base + 7)
       val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
       taskMetrics.outputMetrics = Some(outputMetrics)
-      outputMetrics.bytesWritten = base + 8
+      outputMetrics.setBytesWritten(base + 8)
       taskMetrics
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3453d578/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index db400b4..0357fc6 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -641,13 +641,13 @@ class JsonProtocolSuite extends FunSuite {
       hasHadoopInput: Boolean,
       hasOutput: Boolean) = {
     val t = new TaskMetrics
-    t.hostname = "localhost"
-    t.executorDeserializeTime = a
-    t.executorRunTime = b
-    t.resultSize = c
-    t.jvmGCTime = d
-    t.resultSerializationTime = a + b
-    t.memoryBytesSpilled = a + c
+    t.setHostname("localhost")
+    t.setExecutorDeserializeTime(a)
+    t.setExecutorRunTime(b)
+    t.setResultSize(c)
+    t.setJvmGCTime(d)
+    t.setResultSerializationTime(a + b)
+    t.incMemoryBytesSpilled(a + c)
 
     if (hasHadoopInput) {
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
@@ -655,20 +655,20 @@ class JsonProtocolSuite extends FunSuite {
       t.setInputMetrics(Some(inputMetrics))
     } else {
       val sr = new ShuffleReadMetrics
-      sr.remoteBytesRead = b + d
-      sr.localBlocksFetched = e
-      sr.fetchWaitTime = a + d
-      sr.remoteBlocksFetched = f
+      sr.incRemoteBytesRead(b + d)
+      sr.incLocalBlocksFetched(e)
+      sr.incFetchWaitTime(a + d)
+      sr.incRemoteBlocksFetched(f)
       t.setShuffleReadMetrics(Some(sr))
     }
     if (hasOutput) {
       val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
-      outputMetrics.bytesWritten = a + b + c
+      outputMetrics.setBytesWritten(a + b + c)
       t.outputMetrics = Some(outputMetrics)
     } else {
       val sw = new ShuffleWriteMetrics
-      sw.shuffleBytesWritten = a + b + c
-      sw.shuffleWriteTime = b + c + d
+      sw.incShuffleBytesWritten(a + b + c)
+      sw.incShuffleWriteTime(b + c + d)
       t.shuffleWriteMetrics = Some(sw)
     }
     // Make at most 6 blocks


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org