You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/08 06:56:46 UTC

[1/4] git commit: Get rid of `Either[ActorRef, ActorSelection]'

Updated Branches:
  refs/heads/master 11891e68c -> f5f12dc28


Get rid of `Either[ActorRef, ActorSelection]'

Although we can send messages via an ActorSelection, it would be better to identify the actor and obtain an ActorRef first, so that we can get informed earlier if the remote actor doesn't exist, and get rid of the annoying Either wrapper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a4048ff3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a4048ff3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a4048ff3

Branch: refs/heads/master
Commit: a4048ff31e6f8d3e1451d8ae2d5b9edee42cfbbe
Parents: d43ad3e
Author: Lian, Cheng <rh...@gmail.com>
Authored: Mon Jan 6 09:18:17 2014 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Mon Jan 6 09:18:17 2014 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/MapOutputTracker.scala   | 14 ++------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala | 16 ++++++++--------
 .../apache/spark/storage/BlockManagerMaster.scala   |  8 ++------
 .../org/apache/spark/storage/ThreadingTest.scala    |  2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala     |  5 +++++
 .../spark/storage/DiskBlockManagerSuite.scala       |  4 +---
 6 files changed, 19 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a4048ff3/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index cdae167..77b8ca1 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
   private val timeout = AkkaUtils.askTimeout(conf)
 
   // Set to the MapOutputTrackerActor living on the driver
-  var trackerActor: Either[ActorRef, ActorSelection] = _
+  var trackerActor: ActorRef = _
 
   protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
 
@@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
   // throw a SparkException if this fails.
   private def askTracker(message: Any): Any = {
     try {
-      /*
-        The difference between ActorRef and ActorSelection is well explained here:
-        http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
-        In spark a map output tracker can be either started on Driver where it is created which
-        is an ActorRef or it can be on executor from where it is looked up which is an
-        actorSelection.
-       */
-      val future = trackerActor match {
-        case Left(a: ActorRef) => a.ask(message)(timeout)
-        case Right(b: ActorSelection) => b.ask(message)(timeout)
-      }
+      val future = trackerActor.ask(message)(timeout)
       Await.result(future, timeout)
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a4048ff3/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 634a94f..2e36ccb 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark
 
-import collection.mutable
-import serializer.Serializer
+import scala.collection.mutable
+import scala.concurrent.Await
 
 import akka.actor._
-import akka.remote.RemoteActorRefProvider
 
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
@@ -157,17 +156,18 @@ object SparkEnv extends Logging {
       conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
       conf)
 
-    def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
+    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
       if (isDriver) {
         logInfo("Registering " + name)
-        Left(actorSystem.actorOf(Props(newActor), name = name))
+        actorSystem.actorOf(Props(newActor), name = name)
       } else {
         val driverHost: String = conf.get("spark.driver.host", "localhost")
         val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
         Utils.checkHost(driverHost, "Expected hostname")
-        val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
-        logInfo("Connecting to " + name + ": " + url)
-        Right(actorSystem.actorSelection(url))
+        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+        val timeout = AkkaUtils.lookupTimeout(conf)
+        logInfo(s"Connecting to $name: $url")
+        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a4048ff3/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index b5afe8c..51a29ed 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.AkkaUtils
 
 private[spark]
-class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
-    conf: SparkConf) extends Logging {
+class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
 
   val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
   val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
@@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
     while (attempts < AKKA_RETRY_ATTEMPTS) {
       attempts += 1
       try {
-        val future = driverActor match {
-          case Left(a: ActorRef) => a.ask(message)(timeout)
-          case Right(b: ActorSelection) => b.ask(message)(timeout)
-        }
+        val future = driverActor.ask(message)(timeout)
         val result = Await.result(future, timeout)
         if (result == null) {
           throw new SparkException("BlockManagerMaster returned null")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a4048ff3/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index dca98c6..729ba2c 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -95,7 +95,7 @@ private[spark] object ThreadingTest {
     val conf = new SparkConf()
     val serializer = new KryoSerializer(conf)
     val blockManagerMaster = new BlockManagerMaster(
-      Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
     val blockManager = new BlockManager(
       "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
     val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a4048ff3/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 362cea5..b4c4e1d 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -91,4 +91,9 @@ private[spark] object AkkaUtils {
   def askTimeout(conf: SparkConf): FiniteDuration = {
     Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
   }
+
+  /** Returns the default Spark timeout to use for Akka remote actor lookup. */
+  def lookupTimeout(conf: SparkConf): FiniteDuration = {
+    Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a4048ff3/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index af4b31d..829f389 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -23,9 +23,7 @@ import scala.collection.mutable
 
 import com.google.common.io.Files
 import org.apache.spark.SparkConf
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-import scala.util.Try
-import akka.actor.{Props, ActorSelection, ActorSystem}
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
 
 class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
   private val testConf = new SparkConf(false)


[4/4] git commit: Merge pull request #336 from liancheng/akka-remote-lookup

Posted by pw...@apache.org.
Merge pull request #336 from liancheng/akka-remote-lookup

Get rid of `Either[ActorRef, ActorSelection]'

In this pull request, instead of returning an `Either[ActorRef, ActorSelection]`, `registerOrLookup` identifies the remote actor blockingly to obtain an `ActorRef`, or throws an exception if the remote actor doesn't exist or the lookup times out (configured by `spark.akka.lookupTimeout`).  This function is only called when an `SparkEnv` is constructed (instantiating driver or executor), so the blocking call is considered acceptable.  Executor side `ActorSelection`s/`ActorRef`s to driver side `MapOutputTrackerMasterActor` and `BlockManagerMasterActor` are affected by this pull request.

`ActorSelection` is dangerous and should be used with care.  It's only absolutely safe to send messages via an `ActorSelection` when the remote actor is stateless, so that actor incarnation is irrelevant.  But as pointed by @ScrapCodes in the comments below, executor exits immediately once the connection to the driver lost, `ActorSelection`s are not harmful in this scenario.  So this pull request is mostly a code style patch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f5f12dc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f5f12dc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f5f12dc2

Branch: refs/heads/master
Commit: f5f12dc28218f3ed89836434ab0530e88b043e47
Parents: 11891e6 eb24684
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jan 7 21:56:35 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 7 21:56:35 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/MapOutputTracker.scala  | 14 ++------------
 .../src/main/scala/org/apache/spark/SparkEnv.scala | 16 ++++++++--------
 .../apache/spark/storage/BlockManagerMaster.scala  |  8 ++------
 .../org/apache/spark/storage/ThreadingTest.scala   |  2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala    |  5 +++++
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 17 ++++++++++-------
 .../apache/spark/storage/BlockManagerSuite.scala   |  2 +-
 .../spark/storage/DiskBlockManagerSuite.scala      |  4 +---
 8 files changed, 30 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f5f12dc2/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f5f12dc2/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index fded582,032c2f2..f60ce27
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@@ -56,10 -58,10 +56,10 @@@ class BlockManagerSuite extends FunSuit
      conf.set("spark.hostPort", "localhost:" + boundPort)
  
      master = new BlockManagerMaster(
-       Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
+       actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
  
      // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
 -    System.setProperty("os.arch", "amd64")
 +    oldArch = System.setProperty("os.arch", "amd64")
      conf.set("os.arch", "amd64")
      conf.set("spark.test.useCompressedOops", "true")
      conf.set("spark.storage.disableBlockManagerHeartBeat", "true")


[2/4] git commit: Fixed several compilation errors in test suites

Posted by pw...@apache.org.
Fixed several compilation errors in test suites


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5c152e3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5c152e3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5c152e3e

Branch: refs/heads/master
Commit: 5c152e3e219a44f97d9df38ba00cdc4adbf4d873
Parents: a4048ff
Author: Lian, Cheng <rh...@gmail.com>
Authored: Mon Jan 6 10:39:05 2014 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Mon Jan 6 10:39:05 2014 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/MapOutputTrackerSuite.scala   | 11 +++++++----
 .../org/apache/spark/storage/BlockManagerSuite.scala     |  2 +-
 2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c152e3e/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 10b8b44..82dc30e 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -23,6 +23,7 @@ import akka.actor._
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AkkaUtils
+import scala.concurrent.Await
 
 class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
   private val conf = new SparkConf
@@ -101,13 +102,15 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     System.setProperty("spark.hostPort", hostname + ":" + boundPort)
 
     val masterTracker = new MapOutputTrackerMaster(conf)
-    masterTracker.trackerActor = Left(actorSystem.actorOf(
-        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
+    masterTracker.trackerActor = actorSystem.actorOf(
+        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
 
     val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf)
     val slaveTracker = new MapOutputTracker(conf)
-    slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
-        "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
+    val selection = slaveSystem.actorSelection(
+      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
 
     masterTracker.registerShuffle(10, 1)
     masterTracker.incrementEpoch()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c152e3e/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a0fc344..032c2f2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -58,7 +58,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     conf.set("spark.hostPort", "localhost:" + boundPort)
 
     master = new BlockManagerMaster(
-      Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
 
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     System.setProperty("os.arch", "amd64")


[3/4] git commit: Fixed test suite compilation errors

Posted by pw...@apache.org.
Fixed test suite compilation errors


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/eb246847
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/eb246847
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/eb246847

Branch: refs/heads/master
Commit: eb24684748da5dc2495fc4afe6da58edb463294b
Parents: 5c152e3
Author: Lian, Cheng <rh...@gmail.com>
Authored: Mon Jan 6 11:21:35 2014 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Mon Jan 6 11:26:59 2014 +0800

----------------------------------------------------------------------
 .../test/scala/org/apache/spark/MapOutputTrackerSuite.scala    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/eb246847/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 82dc30e..afc1bef 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -50,14 +50,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
   test("master start and stop") {
     val actorSystem = ActorSystem("test")
     val tracker = new MapOutputTrackerMaster(conf)
-    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
+    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
     tracker.stop()
   }
 
   test("master register and fetch") {
     val actorSystem = ActorSystem("test")
     val tracker = new MapOutputTrackerMaster(conf)
-    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
+    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
     tracker.registerShuffle(10, 2)
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -76,7 +76,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
   test("master register and unregister and fetch") {
     val actorSystem = ActorSystem("test")
     val tracker = new MapOutputTrackerMaster(conf)
-    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
+    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
     tracker.registerShuffle(10, 2)
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)