You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:42:05 UTC

[21/50] [abbrv] git commit: Improvements from the review comments and followed Boy Scout Rule.

Improvements from the review comments and followed Boy Scout Rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/54862af5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/54862af5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/54862af5

Branch: refs/heads/master
Commit: 54862af5ee813030ead80ec097f48620ddb974fc
Parents: dca946f
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Wed Nov 27 14:26:28 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Wed Nov 27 14:26:28 2013 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/MapOutputTracker.scala    |  8 ++------
 .../org/apache/spark/deploy/master/Master.scala      |  8 +++-----
 .../org/apache/spark/deploy/worker/Worker.scala      |  9 +++++++--
 .../org/apache/spark/scheduler/DAGScheduler.scala    |  2 +-
 .../spark/scheduler/cluster/ClusterScheduler.scala   |  4 +---
 .../cluster/CoarseGrainedSchedulerBackend.scala      |  5 +----
 .../apache/spark/storage/BlockManagerMaster.scala    | 15 +++------------
 .../main/scala/org/apache/spark/util/AkkaUtils.scala |  5 +++--
 docs/configuration.md                                |  4 ++--
 9 files changed, 23 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index a686b53..88a7f24 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,15 +21,11 @@ import java.io._
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
 import scala.collection.mutable.HashSet
+import scala.concurrent.Await
+import scala.concurrent.duration._
 
 import akka.actor._
-import scala.concurrent.Await
 import akka.pattern.ask
-import akka.remote._
-
-import scala.concurrent.duration.Duration
-import akka.util.Timeout
-import scala.concurrent.duration._
 
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0e2b461..c627dd3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -39,9 +39,6 @@ import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
   import context.dispatcher
@@ -159,7 +156,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       System.exit(0)
     }
 
-    case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
+    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
       logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
         host, workerPort, cores, Utils.megabytesToString(memory)))
       if (state == RecoveryState.STANDBY) {
@@ -167,7 +164,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       } else if (idToWorker.contains(id)) {
         sender ! RegisterWorkerFailed("Duplicate worker ID")
       } else {
-        val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+          sender, workerWebUiPort, publicAddress)
         registerWorker(worker)
         persistenceEngine.addWorker(worker)
         sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0a183af..808b54c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -27,7 +27,7 @@ import scala.concurrent.duration._
 import akka.actor._
 import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
@@ -73,6 +73,7 @@ private[spark] class Worker(
 
   val masterLock: Object = new Object()
   var master: ActorSelection = null
+  var masterAddress: Address = null
   var activeMasterUrl: String = ""
   var activeMasterWebUiUrl : String = ""
   @volatile var registered = false
@@ -136,6 +137,10 @@ private[spark] class Worker(
       activeMasterUrl = url
       activeMasterWebUiUrl = uiUrl
       master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+      masterAddress = activeMasterUrl match {
+        case Master.sparkUrlRegex(_host, _port) => Address("akka.tcp", Master.systemName, _host, _port.toInt)
+        case x => throw new SparkException("Invalid spark URL:"+x)
+      }
       connected = true
     }
   }
@@ -240,7 +245,7 @@ private[spark] class Worker(
         }
       }
 
-    case x: DisassociatedEvent =>
+    case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
       logInfo(s"$x Disassociated !")
       masterDisconnected()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fdea3f6..773e9ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -22,7 +22,6 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
-import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
 import scala.reflect.ClassTag
 
@@ -115,6 +114,7 @@ class DAGScheduler(
 
   private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
     override def preStart() {
+      import context.dispatcher
       context.system.scheduler.schedule(RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT) {
         if (failed.size > 0) {
           resubmitFailedStages()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 7c9d6a9..8056cb2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -24,8 +24,6 @@ import java.util.{TimerTask, Timer}
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
-
-import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
 
 import org.apache.spark._
@@ -123,7 +121,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
     if (System.getProperty("spark.speculation", "false").toBoolean) {
       logInfo("Starting speculative execution thread")
-
+      import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
             SPECULATION_INTERVAL milliseconds) {
         checkSpeculatableTasks()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index d614dcb..f5e8766 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration._
 
 import akka.actor._
 import akka.pattern.ask
-import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
 import org.apache.spark.{SparkException, Logging, TaskState}
 import org.apache.spark.scheduler.TaskDescription
@@ -52,7 +52,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
     private val executorAddress = new HashMap[String, Address]
     private val executorHost = new HashMap[String, String]
     private val freeCores = new HashMap[String, Int]
-    private val actorToExecutorId = new HashMap[ActorRef, String]
     private val addressToExecutorId = new HashMap[Address, String]
 
     override def preStart() {
@@ -77,7 +76,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
           executorHost(executorId) = Utils.parseHostPort(hostPort)._1
           freeCores(executorId) = cores
           executorAddress(executorId) = sender.path.address
-          actorToExecutorId(sender) = executorId
           addressToExecutorId(sender.path.address) = executorId
           totalCoreCount.addAndGet(cores)
           makeOffers()
@@ -147,7 +145,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
       if (executorActor.contains(executorId)) {
         logInfo("Executor " + executorId + " disconnected, so removing it")
         val numCores = freeCores(executorId)
-        actorToExecutorId -= executorActor(executorId)
         addressToExecutorId -= executorAddress(executorId)
         executorActor -= executorId
         executorHost -= executorId

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index a4aa316..e5de16f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,24 +17,15 @@
 
 package org.apache.spark.storage
 
-import akka.actor._
-import scala.concurrent.Await
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
 import scala.concurrent.ExecutionContext.Implicits.global
 
+import akka.actor._
 import akka.pattern.ask
-import scala.concurrent.duration._
 
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.storage.BlockManagerMessages.GetLocations
-import org.apache.spark.storage.BlockManagerMessages.GetLocationsMultipleBlockIds
-import org.apache.spark.storage.BlockManagerMessages.RegisterBlockManager
-import org.apache.spark.storage.BlockManagerMessages.HeartBeat
-import org.apache.spark.storage.BlockManagerMessages.RemoveExecutor
-import org.apache.spark.storage.BlockManagerMessages.GetPeers
-import org.apache.spark.storage.BlockManagerMessages.RemoveBlock
-import org.apache.spark.storage.BlockManagerMessages.RemoveRdd
 
 private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 23e9b73..3444d8f 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -44,8 +44,9 @@ private[spark] object AkkaUtils {
     val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
     val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
 
-    val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt
-    val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
+    val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "60").toInt
+    val akkaFailureDetector =
+      System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
     val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "5").toInt 
 
     val akkaConf = ConfigFactory.parseString(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54862af5/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 25e7cec..4d1a987 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -281,7 +281,7 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td>spark.akka.pauses</td>
+  <td>spark.akka.heartbeat.pauses</td>
   <td>60</td>
   <td>
      Acceptable heart beat pause in seconds for akka, tune this if you expect GC pauses or network delays (reconnections) etc.
@@ -298,7 +298,7 @@ Apart from these, the following properties are also available, and may be useful
   <td>spark.akka.heartbeat.interval</td>
   <td>5</td>
   <td>
-     A larger interval value in seconds reduces network overhead and a smaller value might be more informative for akka's failure detector. Tune this in combination of `spark.akka.pauses` and `spark.akka.failure-detector.threshold` if you need to.
+     A larger interval value in seconds reduces network overhead and a smaller value might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to.
   </td>
 </tr>
 <tr>