You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/24 22:51:31 UTC

git commit: [SPARK-4006] In long running contexts, we encountered the situation of d...

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 80dde80a6 -> 59297e951


[SPARK-4006] In long running contexts, we encountered the situation of d...

...ouble registe...

...r without a remove in between. The cause for that is unknown, and assumed a temp network issue.

However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us.

The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones.

Also - added some logging for register and unregister.

This is just like https://github.com/apache/spark/pull/2886 except it's on branch-1.1

Author: Tal Sliwowicz <ta...@taboola.com>

Closes #2915 from tsliwowicz/branch-1.1-block-mgr-removal and squashes the following commits:

d122236 [Tal Sliwowicz] [SPARK-4006] In long running contexts, we encountered the situation of double registe...


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59297e95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59297e95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59297e95

Branch: refs/heads/branch-1.1
Commit: 59297e9510557edd4828a3961aa3559dbeae5f30
Parents: 80dde80
Author: Tal Sliwowicz <ta...@taboola.com>
Authored: Fri Oct 24 13:51:25 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Oct 24 13:51:25 2014 -0700

----------------------------------------------------------------------
 .../spark/storage/BlockManagerMasterActor.scala | 25 ++++++++++----------
 1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59297e95/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 3ab0770..dc80148 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -204,6 +204,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
       }
     }
     listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
+    logInfo(s"Removing block manager $blockManagerId")
   }
 
   private def expireDeadHosts() {
@@ -327,20 +328,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
   private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
     if (!blockManagerInfo.contains(id)) {
       blockManagerIdByExecutor.get(id.executorId) match {
-        case Some(manager) =>
-          // A block manager of the same executor already exists.
-          // This should never happen. Let's just quit.
-          logError("Got two different block manager registrations on " + id.executorId)
-          System.exit(1)
+        case Some(oldId) =>
+          // A block manager of the same executor already exists, so remove it (assumed dead)
+          logError("Got two different block manager registrations on same executor - " 
+              + s" will replace old one $oldId with new one $id")
+          removeExecutor(id.executorId)  
         case None =>
-          blockManagerIdByExecutor(id.executorId) = id
       }
-
-      logInfo("Registering block manager %s with %s RAM".format(
-        id.hostPort, Utils.bytesToString(maxMemSize)))
-
-      blockManagerInfo(id) =
-        new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
+      logInfo("Registering block manager %s with %s RAM, %s".format(
+        id.hostPort, Utils.bytesToString(maxMemSize), id))
+      
+      blockManagerIdByExecutor(id.executorId) = id
+      
+      blockManagerInfo(id) = new BlockManagerInfo(
+        id, System.currentTimeMillis(), maxMemSize, slaveActor)
     }
     listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org