You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/08 01:19:27 UTC

spark git commit: [SPARK-6737] Fix memory leak in OutputCommitCoordinator

Repository: spark
Updated Branches:
  refs/heads/master 77bcceb9f -> c83e03948


[SPARK-6737] Fix memory leak in OutputCommitCoordinator

This patch fixes a memory leak in the DAGScheduler, which caused us to leak a map entry per submitted stage.  The problem is that the OutputCommitCoordinator needs to be informed when stages end in order to remove entries from its `authorizedCommitters` map, but the DAGScheduler only called it in one of the four code paths that are used to mark stages as completed.

This patch fixes this issue by consolidating the processing of stage completion into a new `markStageAsFinished` method and updates DAGSchedulerSuite's `assertDataStructuresEmpty` assertion to also check the OutputCommitCoordinator data structures.  I've also added a comment at the top of DAGScheduler so that we remember to update this test when adding new data structures.

Author: Josh Rosen <jo...@databricks.com>

Closes #5397 from JoshRosen/SPARK-6737 and squashes the following commits:

af3b02f [Josh Rosen] Consolidate stage completion handling code in a single method.
e96ce3a [Josh Rosen] Consolidate stage completion handling code in a single method.
3052aea [Josh Rosen] Comment update
7896899 [Josh Rosen] Fix SPARK-6737 by informing OutputCommitCoordinator of all stage end events.
4ead1dc [Josh Rosen] Add regression tests for SPARK-6737


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c83e0394
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c83e0394
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c83e0394

Branch: refs/heads/master
Commit: c83e03948b184ffb3a9418fecc4d2c26ae33b057
Parents: 77bcceb
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Apr 7 16:18:55 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Apr 7 16:18:55 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 63 +++++++++++---------
 .../scheduler/OutputCommitCoordinator.scala     |  7 +++
 .../spark/scheduler/DAGSchedulerSuite.scala     |  1 +
 3 files changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c83e0394/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c82ae4b..c912520 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -50,6 +50,10 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
  * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
  * a small number of times before cancelling the whole stage.
  *
+ * Here's a checklist to use when making or reviewing changes to this class:
+ *
+ *  - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
+ *    include the new structure. This will help to catch memory leaks.
  */
 private[spark]
 class DAGScheduler(
@@ -111,6 +115,8 @@ class DAGScheduler(
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]
 
+  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
+
   // A closure serializer that we reuse.
   // This is only safe because DAGScheduler runs in a single thread.
   private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
@@ -128,8 +134,6 @@ class DAGScheduler(
   private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)
 
-  private val outputCommitCoordinator = env.outputCommitCoordinator
-
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -710,9 +714,10 @@ class DAGScheduler(
       // cancelling the stages because if the DAG scheduler is stopped, the entire application
       // is in the process of getting stopped.
       val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
-      runningStages.foreach { stage =>
-        stage.latestInfo.stageFailed(stageFailedMessage)
-        listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+      // The `toArray` here is necessary so that we don't iterate over `runningStages` while
+      // mutating it.
+      runningStages.toArray.foreach { stage =>
+        markStageAsFinished(stage, Some(stageFailedMessage))
       }
       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
     }
@@ -887,10 +892,9 @@ class DAGScheduler(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     } else {
-      // Because we posted SparkListenerStageSubmitted earlier, we should post
-      // SparkListenerStageCompleted here in case there are no tasks to run.
-      outputCommitCoordinator.stageEnd(stage.id)
-      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+      // Because we posted SparkListenerStageSubmitted earlier, we should mark
+      // the stage as completed here in case there are no tasks to run
+      markStageAsFinished(stage, None)
 
       val debugString = stage match {
         case stage: ShuffleMapStage =>
@@ -902,7 +906,6 @@ class DAGScheduler(
           s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
       }
       logDebug(debugString)
-      runningStages -= stage
     }
   }
 
@@ -968,22 +971,6 @@ class DAGScheduler(
     }
 
     val stage = stageIdToStage(task.stageId)
-
-    def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
-      val serviceTime = stage.latestInfo.submissionTime match {
-        case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
-        case _ => "Unknown"
-      }
-      if (errorMessage.isEmpty) {
-        logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
-        stage.latestInfo.completionTime = Some(clock.getTimeMillis())
-      } else {
-        stage.latestInfo.stageFailed(errorMessage.get)
-        logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
-      }
-      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
-      runningStages -= stage
-    }
     event.reason match {
       case Success =>
         listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
@@ -1099,7 +1086,6 @@ class DAGScheduler(
           logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
             s"due to a fetch failure from $mapStage (${mapStage.name})")
           markStageAsFinished(failedStage, Some(failureMessage))
-          runningStages -= failedStage
         }
 
         if (disallowStageRetryForTest) {
@@ -1216,6 +1202,26 @@ class DAGScheduler(
   }
 
   /**
+   * Marks a stage as finished and removes it from the list of running stages.
+   */
+  private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
+    val serviceTime = stage.latestInfo.submissionTime match {
+      case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
+      case _ => "Unknown"
+    }
+    if (errorMessage.isEmpty) {
+      logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
+      stage.latestInfo.completionTime = Some(clock.getTimeMillis())
+    } else {
+      stage.latestInfo.stageFailed(errorMessage.get)
+      logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
+    }
+    outputCommitCoordinator.stageEnd(stage.id)
+    listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+    runningStages -= stage
+  }
+
+  /**
    * Aborts all jobs depending on a particular Stage. This is called in response to a task set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
    */
@@ -1264,8 +1270,7 @@ class DAGScheduler(
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-              stage.latestInfo.stageFailed(failureReason)
-              listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+              markStageAsFinished(stage, Some(failureReason))
             } catch {
               case e: UnsupportedOperationException =>
                 logInfo(s"Could not cancel tasks for stage $stageId", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/c83e0394/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 9e29fd1..7c184b1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -60,6 +60,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
   private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
 
   /**
+   * Returns whether the OutputCommitCoordinator's internal data structures are all empty.
+   */
+  def isEmpty: Boolean = {
+    authorizedCommittersByStage.isEmpty
+  }
+
+  /**
    * Called by tasks to ask whether they can commit their output to HDFS.
    *
    * If a task attempt has been authorized to commit, then all other attempts to commit the same

http://git-wip-us.apache.org/repos/asf/spark/blob/c83e0394/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 63360a0..eb759f0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -783,6 +783,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(scheduler.runningStages.isEmpty)
     assert(scheduler.shuffleToMapStage.isEmpty)
     assert(scheduler.waitingStages.isEmpty)
+    assert(scheduler.outputCommitCoordinator.isEmpty)
   }
 
   // Nothing in this test should break if the task info's fields are null, but


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org