You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:59 UTC

[15/50] [abbrv] git commit: Fine tuning defaults for akka and restored tracking of dissassociated events, for they are delivered when a remote TCP socket is closed. Also made transport failure heartbeats larger interval for it is mostly not needed. As we

Fine tuning defaults for akka and restored tracking of dissassociated events, for they are delivered when a remote TCP socket is closed. Also made transport failure heartbeats larger interval for it is mostly not needed. As we are using remote death watch instead.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/77929cfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/77929cfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/77929cfe

Branch: refs/heads/master
Commit: 77929cfeed95905106f5b3891e8de1b1c312d119
Parents: 95d8dbc
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Fri Nov 22 19:46:39 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Mon Nov 25 14:13:21 2013 +0530

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 23 +++++++++++++++-----
 .../org/apache/spark/deploy/worker/Worker.scala | 12 ++++++----
 .../executor/CoarseGrainedExecutorBackend.scala | 11 +++++-----
 .../cluster/CoarseGrainedSchedulerBackend.scala |  3 +++
 .../scala/org/apache/spark/util/AkkaUtils.scala | 12 +++++-----
 5 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a7cfc25..25f5927 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.deploy.master
 
-import java.util.Date
 import java.text.SimpleDateFormat
+import java.util.concurrent.TimeUnit
+import java.util.Date
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Await
@@ -28,6 +29,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
 import akka.actor._
 import akka.pattern.ask
 import akka.remote._
+import akka.serialization.SerializationExtension
 import akka.util.Timeout
 
 import org.apache.spark.{Logging, SparkException}
@@ -40,11 +42,6 @@ import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
 import org.apache.spark.deploy.DeployMessages.KillExecutor
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
-import akka.actor.Terminated
-import akka.serialization.SerializationExtension
-import java.util.concurrent.TimeUnit
-
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
   import context.dispatcher
@@ -102,6 +99,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   override def preStart() {
     logInfo("Starting Spark master at " + masterUrl)
     // Listen for remote client disconnection events, since they don't go through Akka's watch()
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     webUi.start()
     masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -267,11 +265,20 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     case Terminated(actor) => {
       // The disconnected actor could've been either a worker or an app; remove whichever of
       // those we have an entry for in the corresponding actor hashmap
+      logInfo(s"$actor got terminated, removing it.")
       actorToWorker.get(actor).foreach(removeWorker)
       actorToApp.get(actor).foreach(finishApplication)
       if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
     }
 
+    case DisassociatedEvent(_, address, _) => {
+      // The disconnected client could've been either a worker or an app; remove whichever it was
+      logInfo(s"$address got disassociated, removing it.")
+      addressToWorker.get(address).foreach(removeWorker)
+      addressToApp.get(address).foreach(finishApplication)
+      if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
+    }
+
     case RequestMasterState => {
       sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
         state)
@@ -431,6 +438,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         exec.id, ExecutorState.LOST, Some("worker lost"), None)
       exec.application.removeExecutor(exec)
     }
+    context.stop(worker.actor)
+    context.unwatch(worker.actor)
     persistenceEngine.removeWorker(worker)
   }
 
@@ -493,6 +502,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         app.driver ! ApplicationRemoved(state.toString)
       }
       persistenceEngine.removeApplication(app)
+      context.stop(app.driver)
+      context.unwatch(app.driver)
       schedule()
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 9472c9a..3a7d0b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.deploy.worker
 
+import java.io.File
 import java.text.SimpleDateFormat
 import java.util.Date
-import java.io.File
 
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 
 import akka.actor._
+import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
+
 import org.apache.spark.Logging
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
@@ -36,10 +38,8 @@ import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
 import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
 import org.apache.spark.deploy.DeployMessages.KillExecutor
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
 import org.apache.spark.deploy.DeployMessages.Heartbeat
 import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import akka.remote.DisassociatedEvent
 import org.apache.spark.deploy.DeployMessages.LaunchExecutor
 import org.apache.spark.deploy.DeployMessages.RegisterWorker
 
@@ -124,7 +124,7 @@ private[spark] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
-
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     webUi.start()
     registerWithMaster()
 
@@ -249,6 +249,10 @@ private[spark] class Worker(
       logInfo(s"$actor_ terminated !")
       masterDisconnected()
 
+    case x: DisassociatedEvent =>
+      logInfo(s"$x Disassociated !")
+      masterDisconnected()
+
     case RequestWorkerState => {
       sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
         finishedExecutors.values.toList, activeMasterUrl, cores, memory,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a98ec06..2818a77 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,11 +26,6 @@ import org.apache.spark.Logging
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{Utils, AkkaUtils}
-import akka.remote.DisassociatedEvent
-import akka.remote.AssociationErrorEvent
-import akka.remote.DisassociatedEvent
-import akka.actor.Terminated
-
 
 private[spark] class CoarseGrainedExecutorBackend(
     driverUrl: String,
@@ -82,7 +77,11 @@ private[spark] class CoarseGrainedExecutorBackend(
       }
 
     case Terminated(actor) =>
-      logError(s"Driver $actor terminated or disconnected! Shutting down.")
+      logError(s"Driver $actor terminated, Shutting down.")
+      System.exit(1)
+
+    case x: DisassociatedEvent =>
+      logError(s"Driver $x disassociated! Shutting down.")
       System.exit(1)
 
     case StopExecutor =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 821c30a..e316f6b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -121,6 +121,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
       case Terminated(actor) =>
         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
 
+      case DisassociatedEvent(_, address, _) => 
+        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
+
     }
 
     // Make fake resource offers on all executors

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 2a83138..90a5387 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -44,9 +44,11 @@ private[spark] object AkkaUtils {
     val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
     val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
 
-    val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt
-    val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt
-    val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
+    val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt
+    val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
+    // Since we have our own Heart Beat mechanism and TCP already tracks connections. 
+    // Using this makes very little sense. So setting this to a relatively larger value suffices.
+    val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt 
 
     val akkaConf = ConfigFactory.parseString(
       s"""
@@ -56,8 +58,8 @@ private[spark] object AkkaUtils {
       |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
       |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
       |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
-      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
-      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+      |akka.remote.transport-failure-detector.heartbeat-interval = 30 s
+      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = ${akkaHeartBeatPauses + 10} s
       |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"