You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2016/04/01 23:23:22 UTC

spark git commit: [SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…

Repository: spark
Updated Branches:
  refs/heads/master 3e991dbc3 -> bd7b91cef


[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…

Currently, when max number of executor failures reached the `maxNumExecutorFailures`, `ApplicationMaster` will be killed and re-register another one.This time, `YarnAllocator` will be created a new instance.
But, the value of property `executorIdCounter` in `YarnAllocator` will reset to `0`. Then the Id of new executor will starting from `1`. This will confuse with the executor has already created before, which will cause FetchFailedException.
This situation is just in yarn client mode, so this is an issue in yarn client mode. For more details, [link to jira issues SPARK-12864](https://issues.apache.org/jira/browse/SPARK-12864)
This PR introduce a mechanism to initialize `executorIdCounter` after `ApplicationMaster` killed.

Author: zhonghaihua <79...@qq.com>

Closes #10794 from zhonghaihua/initExecutorIdCounterAfterAMKilled.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd7b91ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd7b91ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd7b91ce

Branch: refs/heads/master
Commit: bd7b91cefb0d192d808778e6182dcdd2c143e132
Parents: 3e991db
Author: zhonghaihua <79...@qq.com>
Authored: Fri Apr 1 16:23:14 2016 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Fri Apr 1 16:23:14 2016 -0500

----------------------------------------------------------------------
 .../cluster/CoarseGrainedClusterMessage.scala   |  2 ++
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 ++++++
 .../spark/deploy/yarn/YarnAllocator.scala       | 20 ++++++++++++++++++--
 .../cluster/YarnSchedulerBackend.scala          |  3 +++
 4 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd7b91ce/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 8d5c11d..46a8291 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case object RetrieveSparkProps extends CoarseGrainedClusterMessage
 
+  case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
+
   // Driver to executors
   case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd7b91ce/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index eb4f533..70470cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -79,6 +79,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Executors that have been lost, but for which we don't yet know the real exit reason.
   protected val executorsPendingLossReason = new HashSet[String]
 
+  // The num of current max ExecutorId used to re-register appMaster
+  protected var currentExecutorIdCounter = 0
+
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
@@ -156,6 +159,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
             executorDataMap.put(executorId, data)
+            if (currentExecutorIdCounter < executorId.toInt) {
+              currentExecutorIdCounter = executorId.toInt
+            }
             if (numPendingExecutors > 0) {
               numPendingExecutors -= 1
               logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")

http://git-wip-us.apache.org/repos/asf/spark/blob/bd7b91ce/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7d71a64..b0bfe85 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -40,6 +40,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -83,8 +84,23 @@ private[yarn] class YarnAllocator(
     new ConcurrentHashMap[ContainerId, java.lang.Boolean])
 
   @volatile private var numExecutorsRunning = 0
-  // Used to generate a unique ID per executor
-  private var executorIdCounter = 0
+
+  /**
+   * Used to generate a unique ID per executor
+   *
+   * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
+   * the id of new executor will start from 1, this will conflict with the executor has
+   * already created before. So, we should initialize the `executorIdCounter` by getting
+   * the max executorId from driver.
+   *
+   * And this situation of executorId conflict is just in yarn client mode, so this is an issue
+   * in yarn client mode. For more details, can check in jira.
+   *
+   * @see SPARK-12864
+   */
+  private var executorIdCounter: Int =
+    driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
+
   @volatile private var numExecutorsFailed = 0
 
   @volatile private var targetNumExecutors =

http://git-wip-us.apache.org/repos/asf/spark/blob/bd7b91ce/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index a878163..5aeaf44 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -292,6 +292,9 @@ private[spark] abstract class YarnSchedulerBackend(
             logWarning("Attempted to kill executors before the AM has registered!")
             context.reply(false)
         }
+
+      case RetrieveLastAllocatedExecutorId =>
+        context.reply(currentExecutorIdCounter)
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org