You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:48 UTC

[04/50] [abbrv] git commit: Remove deprecated actorFor and use actorSelection everywhere.

Remove deprecated actorFor and use actorSelection everywhere.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6860b79f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6860b79f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6860b79f

Branch: refs/heads/master
Commit: 6860b79f6e4cc0d38b08848f19127c259d9b5069
Parents: a8bfdd4
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Tue Nov 12 12:43:53 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Tue Nov 12 12:43:53 2013 +0530

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |  8 +++++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  8 +++----
 .../org/apache/spark/deploy/client/Client.scala | 23 ++++--------------
 .../org/apache/spark/deploy/worker/Worker.scala | 23 +++++++++++++-----
 .../spark/storage/BlockManagerMaster.scala      | 25 ++++++++++++--------
 .../apache/spark/storage/ThreadingTest.scala    |  2 +-
 .../apache/spark/MapOutputTrackerSuite.scala    | 14 +++++------
 .../spark/storage/BlockManagerSuite.scala       |  2 +-
 .../streaming/examples/ActorWordCount.scala     |  2 +-
 .../streaming/dstream/NetworkInputDStream.scala |  2 +-
 10 files changed, 58 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1afb187..6590e97 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -60,7 +60,7 @@ private[spark] class MapOutputTracker extends Logging {
   private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
 
   // Set to the MapOutputTrackerActor living on the driver
-  var trackerActor: ActorRef = _
+  var trackerActor: Either[ActorRef, ActorSelection] = _
 
   private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
 
@@ -79,7 +79,11 @@ private[spark] class MapOutputTracker extends Logging {
   // throw a SparkException if this fails.
   def askTracker(message: Any): Any = {
     try {
-      val future = trackerActor.ask(message)(timeout)
+      val future = if (trackerActor.isLeft ) {
+        trackerActor.left.get.ask(message)(timeout)
+      } else {
+        trackerActor.right.get.ask(message)(timeout)
+      }
       return Await.result(future, timeout)
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index a267407..0d9bd50 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -20,7 +20,7 @@ package org.apache.spark
 import collection.mutable
 import serializer.Serializer
 
-import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
+import akka.actor._
 import akka.remote.RemoteActorRefProvider
 
 import org.apache.spark.broadcast.BroadcastManager
@@ -161,17 +161,17 @@ object SparkEnv extends Logging {
     val closureSerializer = serializerManager.get(
       System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
 
-    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
+    def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
       if (isDriver) {
         logInfo("Registering " + name)
-        actorSystem.actorOf(Props(newActor), name = name)
+        Left(actorSystem.actorOf(Props(newActor), name = name))
       } else {
         val driverHost: String = System.getProperty("spark.driver.host", "localhost")
         val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
         Utils.checkHost(driverHost, "Expected hostname")
         val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
         logInfo("Connecting to " + name + ": " + url)
-        actorSystem.actorFor(url)
+        Right(actorSystem.actorSelection(url))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 1643867..000d1ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -49,18 +49,14 @@ private[spark] class Client(
   var appId: String = null
 
   class ClientActor extends Actor with Logging {
-    var master: ActorRef = null
-    var masterAddress: Address = null
+    var master: ActorSelection = null
     var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times
 
     override def preStart() {
       logInfo("Connecting to master " + masterUrl)
       try {
-        master = context.actorFor(Master.toAkkaUrl(masterUrl))
-        masterAddress = master.path.address
+        master = context.actorSelection(Master.toAkkaUrl(masterUrl))
         master ! RegisterApplication(appDescription)
-        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-        context.watch(master)  // Doesn't work with remote actors, but useful for testing
       } catch {
         case e: Exception =>
           logError("Failed to connect to master", e)
@@ -71,6 +67,7 @@ private[spark] class Client(
 
     override def receive = {
       case RegisteredApplication(appId_) =>
+        context.watch(sender)
         appId = appId_
         listener.connected(appId)
 
@@ -92,18 +89,8 @@ private[spark] class Client(
           listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
         }
 
-      case Terminated(actor_) if actor_ == master =>
-        logError("Connection to master failed; stopping client")
-        markDisconnected()
-        context.stop(self)
-
-      case DisassociatedEvent(_, address, _) if address == masterAddress =>
-        logError("Connection to master failed; stopping client")
-        markDisconnected()
-        context.stop(self)
-
-      case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
-        logError("Connection to master failed; stopping client")
+      case Terminated(actor_) =>
+        logError(s"Connection to $actor_ dropped, stopping client")
         markDisconnected()
         context.stop(self)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 3904b70..400d6f2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -24,7 +24,7 @@ import java.io.File
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.actor._
 import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
 
 import org.apache.spark.Logging
@@ -34,6 +34,16 @@ import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
+import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
+import org.apache.spark.deploy.DeployMessages.KillExecutor
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import scala.Some
+import org.apache.spark.deploy.DeployMessages.Heartbeat
+import org.apache.spark.deploy.DeployMessages.RegisteredWorker
+import akka.remote.DisassociatedEvent
+import org.apache.spark.deploy.DeployMessages.LaunchExecutor
+import org.apache.spark.deploy.DeployMessages.RegisterWorker
 
 
 private[spark] class Worker(
@@ -54,7 +64,7 @@ private[spark] class Worker(
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
   val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
 
-  var master: ActorRef = null
+  var master: ActorSelection = null
   var masterWebUiUrl : String = ""
   val workerId = generateWorkerId()
   var sparkHome: File = null
@@ -111,10 +121,8 @@ private[spark] class Worker(
 
   def connectToMaster() {
     logInfo("Connecting to master " + masterUrl)
-    master = context.actorFor(Master.toAkkaUrl(masterUrl))
+    master = context.actorSelection(Master.toAkkaUrl(masterUrl))
     master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
-    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    context.watch(master) // Doesn't work with remote actors, but useful for testing
   }
 
   import context.dispatcher
@@ -123,6 +131,8 @@ private[spark] class Worker(
     case RegisteredWorker(url) =>
       masterWebUiUrl = url
       logInfo("Successfully registered with master")
+      context.watch(sender) // remote death watch for master
+      //TODO: Is heartbeat really necessary akka does it anyway !
         context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
         master ! Heartbeat(workerId)
       }
@@ -165,7 +175,8 @@ private[spark] class Worker(
           logInfo("Asked to kill unknown executor " + fullId)
       }
 
-    case DisassociatedEvent(_, _, _) =>
+    case Terminated(actor_) =>
+      logInfo(s"$actor_ terminated !")
       masterDisconnected()
 
     case RequestWorkerState => {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 0c977f0..c1aa43d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,14 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.actor._
 import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -34,8 +27,16 @@ import scala.concurrent.duration._
 
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.storage.BlockManagerMessages.GetLocations
+import org.apache.spark.storage.BlockManagerMessages.GetLocationsMultipleBlockIds
+import org.apache.spark.storage.BlockManagerMessages.RegisterBlockManager
+import org.apache.spark.storage.BlockManagerMessages.HeartBeat
+import org.apache.spark.storage.BlockManagerMessages.RemoveExecutor
+import org.apache.spark.storage.BlockManagerMessages.GetPeers
+import org.apache.spark.storage.BlockManagerMessages.RemoveBlock
+import org.apache.spark.storage.BlockManagerMessages.RemoveRdd
 
-private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
+private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
 
   val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
   val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
@@ -165,7 +166,11 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
     while (attempts < AKKA_RETRY_ATTEMPTS) {
       attempts += 1
       try {
-        val future = driverActor.ask(message)(timeout)
+        val future = if (driverActor.isLeft ) {
+          driverActor.left.get.ask(message)(timeout)
+        } else {
+          driverActor.right.get.ask(message)(timeout)
+        }
         val result = Await.result(future, timeout)
         if (result == null) {
           throw new SparkException("BlockManagerMaster returned null")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index f2ae8dd..1e6da26 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -93,7 +93,7 @@ private[spark] object ThreadingTest {
     val actorSystem = ActorSystem("test")
     val serializer = new KryoSerializer
     val blockManagerMaster = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+      Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
     val blockManager = new BlockManager(
       "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
     val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 18fb1bf..955f6cd 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -49,14 +49,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
   test("master start and stop") {
     val actorSystem = ActorSystem("test")
     val tracker = new MapOutputTracker()
-    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker)))
+    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
     tracker.stop()
   }
 
   test("master register and fetch") {
     val actorSystem = ActorSystem("test")
     val tracker = new MapOutputTracker()
-    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker)))
+    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
     tracker.registerShuffle(10, 2)
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -75,7 +75,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
   test("master register and unregister and fetch") {
     val actorSystem = ActorSystem("test")
     val tracker = new MapOutputTracker()
-    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker)))
+    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
     tracker.registerShuffle(10, 2)
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -103,13 +103,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     System.setProperty("spark.hostPort", hostname + ":" + boundPort)
 
     val masterTracker = new MapOutputTracker()
-    masterTracker.trackerActor = actorSystem.actorOf(
-        Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker")
+    masterTracker.trackerActor = Left(actorSystem.actorOf(
+        Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker"))
 
     val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
     val slaveTracker = new MapOutputTracker()
-    slaveTracker.trackerActor = slaveSystem.actorFor(
-        "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker")
+    slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
+        "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
 
     masterTracker.registerShuffle(10, 1)
     masterTracker.incrementEpoch()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 038a9ac..4fdc43c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -53,7 +53,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     System.setProperty("spark.hostPort", "localhost:" + boundPort)
 
     master = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+      Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
 
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     oldArch = System.setProperty("os.arch", "amd64")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 08e399f..128711a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -86,7 +86,7 @@ class FeederActor extends Actor {
 class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
 extends Actor with Receiver {
 
-  lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+  lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
 
   override def preStart = remotePublisher ! SubscribeReceiver(context.self)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6860b79f/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 394a39f..b2f9f8b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -178,7 +178,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
     val ip = System.getProperty("spark.driver.host", "localhost")
     val port = System.getProperty("spark.driver.port", "7077").toInt
     val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
-    val tracker = env.actorSystem.actorFor(url)
+    val tracker = env.actorSystem.actorSelection(url)
     val timeout = 5.seconds
 
     override def preStart() {