You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/05/16 03:06:05 UTC

spark git commit: [SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver

Repository: spark
Updated Branches:
  refs/heads/master e74545647 -> 2c04c8a1a


[SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver

This fixes a bug where an executor that exits can cause the driver's OutputCommitCoordinator to stop. To fix this, we use an `isDriver` flag and check it in `stop()`.

See https://issues.apache.org/jira/browse/SPARK-7563 for more details.

Author: Josh Rosen <jo...@databricks.com>

Closes #6197 from JoshRosen/SPARK-7563 and squashes the following commits:

04b2cc5 [Josh Rosen] [SPARK-7563] OutputCommitCoordinator.stop() should only be executed on the driver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c04c8a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c04c8a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c04c8a1

Branch: refs/heads/master
Commit: 2c04c8a1aed34cce420b3d30d9e885daa6e03d74
Parents: e745456
Author: Josh Rosen <jo...@databricks.com>
Authored: Fri May 15 18:06:01 2015 -0700
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Fri May 15 18:06:01 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala       |  2 +-
 .../apache/spark/scheduler/OutputCommitCoordinator.scala  | 10 ++++++----
 .../spark/scheduler/OutputCommitCoordinatorSuite.scala    |  2 +-
 3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c04c8a1/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index a5d831c..3271145 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -379,7 +379,7 @@ object SparkEnv extends Logging {
     }
 
     val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
-      new OutputCommitCoordinator(conf)
+      new OutputCommitCoordinator(conf, isDriver)
     }
     val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
       new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))

http://git-wip-us.apache.org/repos/asf/spark/blob/2c04c8a1/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 0b1d47c..8321037 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -38,7 +38,7 @@ private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttem
  * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
  * for an extensive design discussion.
  */
-private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
+private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
 
   // Initialized by SparkEnv
   var coordinatorRef: Option[RpcEndpointRef] = None
@@ -129,9 +129,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
   }
 
   def stop(): Unit = synchronized {
-    coordinatorRef.foreach(_ send StopCoordinator)
-    coordinatorRef = None
-    authorizedCommittersByStage.clear()
+    if (isDriver) {
+      coordinatorRef.foreach(_ send StopCoordinator)
+      coordinatorRef = None
+      authorizedCommittersByStage.clear()
+    }
   }
 
   // Marked private[scheduler] instead of private so this can be mocked in tests

http://git-wip-us.apache.org/repos/asf/spark/blob/2c04c8a1/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index cf97707..7078a7a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
           conf: SparkConf,
           isLocal: Boolean,
           listenerBus: LiveListenerBus): SparkEnv = {
-        outputCommitCoordinator = spy(new OutputCommitCoordinator(conf))
+        outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
         // Use Mockito.spy() to maintain the default infrastructure everywhere else.
         // This mocking allows us to control the coordinator responses in test cases.
         SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org