You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2015/10/08 18:48:04 UTC

spark git commit: [SPARK-10987] [YARN] Workaround for missing netty rpc disconnection event.

Repository: spark
Updated Branches:
  refs/heads/master 2df882ef1 -> 56a9692fc


[SPARK-10987] [YARN] Workaround for missing netty rpc disconnection event.

In YARN client mode, when the AM connects to the driver, it may be the case
that the driver never needs to send a message back to the AM (i.e., no
dynamic allocation or preemption). This triggers an issue in the netty rpc
backend where no disconnection event is sent to endpoints, and the AM never
exits after the driver shuts down.

The real fix is too complicated, so this is a quick hack to unblock YARN
client mode until we can work on the real fix. It forces the driver to
send a message to the AM when the AM registers, thus establishing that
connection and enabling the disconnection event when the driver goes
away.

Also, a minor side issue: when the executor is shutting down, it needs
to send an "ack" back to the driver when using the netty rpc backend; but
that "ack" wasn't being sent because the handler was shutting down the rpc
env before returning. So added a change to delay the shutdown a little bit,
allowing the ack to be sent back.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #9021 from vanzin/SPARK-10987.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56a9692f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56a9692f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56a9692f

Branch: refs/heads/master
Commit: 56a9692fc06077e31b37c00957e8011235f4e4eb
Parents: 2df882e
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Oct 8 09:47:58 2015 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Oct 8 09:47:58 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/executor/CoarseGrainedExecutorBackend.scala  | 5 +++++
 .../spark/scheduler/cluster/CoarseGrainedClusterMessage.scala | 7 +++++++
 .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 2 ++
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala      | 3 +++
 4 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56a9692f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index fcd76ec..49059de 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -110,6 +110,11 @@ private[spark] class CoarseGrainedExecutorBackend(
 
     case StopExecutor =>
       logInfo("Driver commanded a shutdown")
+      // Cannot shutdown here because an ack may need to be sent back to the caller. So send
+      // a message to self to actually do the shutdown.
+      self.send(Shutdown)
+
+    case Shutdown =>
       executor.stop()
       stop()
       rpcEnv.shutdown()

http://git-wip-us.apache.org/repos/asf/spark/blob/56a9692f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index d947436..e0d25dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -100,4 +100,11 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
 
+  // Used internally by executors to shut themselves down.
+  case object Shutdown extends CoarseGrainedClusterMessage
+
+  // SPARK-10987: workaround for netty RPC issue; forces a connection from the driver back
+  // to the AM.
+  case object DriverHello extends CoarseGrainedClusterMessage
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/56a9692f/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e0107f9..38218b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -170,6 +170,8 @@ private[spark] abstract class YarnSchedulerBackend(
       case RegisterClusterManager(am) =>
         logInfo(s"ApplicationMaster registered as $am")
         amEndpoint = Option(am)
+        // See SPARK-10987.
+        am.send(DriverHello)
 
       case AddWebUIFilter(filterName, filterParams, proxyBase) =>
         addWebUIFilter(filterName, filterParams, proxyBase)

http://git-wip-us.apache.org/repos/asf/spark/blob/56a9692f/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a2ccdc0..3791eea 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -564,6 +564,9 @@ private[spark] class ApplicationMaster(
       case x: AddWebUIFilter =>
         logInfo(s"Add WebUI Filter. $x")
         driver.send(x)
+
+      case DriverHello =>
+        // SPARK-10987: no action needed for this message.
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org