You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/20 05:35:45 UTC

spark git commit: [SPARK-6983][Streaming] Update ReceiverTrackerActor to use the new Rpc interface

Repository: spark
Updated Branches:
  refs/heads/master fa73da024 -> d8e1b7b06


[SPARK-6983][Streaming] Update ReceiverTrackerActor to use the new Rpc interface

A subtask of [SPARK-5293](https://issues.apache.org/jira/browse/SPARK-5293)

Author: zsxwing <zs...@gmail.com>

Closes #5557 from zsxwing/SPARK-6983 and squashes the following commits:

e777e9f [zsxwing] Update ReceiverTrackerActor to use the new Rpc interface


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8e1b7b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8e1b7b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8e1b7b0

Branch: refs/heads/master
Commit: d8e1b7b06c499289ff3ce5ec91ff354493a17c48
Parents: fa73da0
Author: zsxwing <zs...@gmail.com>
Authored: Sun Apr 19 20:35:43 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Apr 19 20:35:43 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rpc/RpcEnv.scala     |  2 +-
 .../receiver/ReceiverSupervisorImpl.scala       | 52 +++++-----------
 .../streaming/scheduler/ReceiverInfo.scala      |  4 +-
 .../streaming/scheduler/ReceiverTracker.scala   | 64 ++++++++++----------
 4 files changed, 52 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8e1b7b0/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index f2c1c86..cba038c 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -258,7 +258,7 @@ private[spark] trait RpcEndpoint {
   final def stop(): Unit = {
     val _self = self
     if (_self != null) {
-      rpcEnv.stop(self)
+      rpcEnv.stop(_self)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d8e1b7b0/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 8f2f1fe..89af403 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -21,18 +21,16 @@ import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Await
 
-import akka.actor.{ActorRef, Actor, Props}
-import akka.pattern.ask
 import com.google.common.base.Throwables
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{Logging, SparkEnv, SparkException}
+import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.Time
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
 
 /**
  * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -63,37 +61,23 @@ private[streaming] class ReceiverSupervisorImpl(
   }
 
 
-  /** Remote Akka actor for the ReceiverTracker */
-  private val trackerActor = {
-    val ip = env.conf.get("spark.driver.host", "localhost")
-    val port = env.conf.getInt("spark.driver.port", 7077)
-    val url = AkkaUtils.address(
-      AkkaUtils.protocol(env.actorSystem),
-      SparkEnv.driverActorSystemName,
-      ip,
-      port,
-      "ReceiverTracker")
-    env.actorSystem.actorSelection(url)
-  }
-
-  /** Timeout for Akka actor messages */
-  private val askTimeout = AkkaUtils.askTimeout(env.conf)
+  /** Remote RpcEndpointRef for the ReceiverTracker */
+  private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)
 
-  /** Akka actor for receiving messages from the ReceiverTracker in the driver */
-  private val actor = env.actorSystem.actorOf(
-    Props(new Actor {
+  /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
+  private val endpoint = env.rpcEnv.setupEndpoint(
+    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
+      override val rpcEnv: RpcEnv = env.rpcEnv
 
       override def receive: PartialFunction[Any, Unit] = {
         case StopReceiver =>
           logInfo("Received stop signal")
-          stop("Stopped by driver", None)
+          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
         case CleanupOldBlocks(threshTime) =>
           logDebug("Received delete old batch signal")
           cleanupOldBlocks(threshTime)
       }
-
-      def ref: ActorRef = self
-    }), "Receiver-" + streamId + "-" + System.currentTimeMillis())
+    })
 
   /** Unique block ids if one wants to add blocks directly */
   private val newBlockId = new AtomicLong(System.currentTimeMillis())
@@ -162,15 +146,14 @@ private[streaming] class ReceiverSupervisorImpl(
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
 
     val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
-    val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
-    Await.result(future, askTimeout)
+    trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
     logDebug(s"Reported block $blockId")
   }
 
   /** Report error to the receiver tracker */
   def reportError(message: String, error: Throwable) {
     val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
-    trackerActor ! ReportError(streamId, message, errorString)
+    trackerEndpoint.send(ReportError(streamId, message, errorString))
     logWarning("Reported error " + message + " - " + error)
   }
 
@@ -180,22 +163,19 @@ private[streaming] class ReceiverSupervisorImpl(
 
   override protected def onStop(message: String, error: Option[Throwable]) {
     blockGenerator.stop()
-    env.actorSystem.stop(actor)
+    env.rpcEnv.stop(endpoint)
   }
 
   override protected def onReceiverStart() {
     val msg = RegisterReceiver(
-      streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
-    val future = trackerActor.ask(msg)(askTimeout)
-    Await.result(future, askTimeout)
+      streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
+    trackerEndpoint.askWithReply[Boolean](msg)
   }
 
   override protected def onReceiverStop(message: String, error: Option[Throwable]) {
     logInfo("Deregistering receiver " + streamId)
     val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
-    val future = trackerActor.ask(
-      DeregisterReceiver(streamId, message, errorString))(askTimeout)
-    Await.result(future, askTimeout)
+    trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString))
     logInfo("Stopped receiver " + streamId)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d8e1b7b0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index d7e39c5..52f08b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.streaming.scheduler
 
-import akka.actor.ActorRef
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rpc.RpcEndpointRef
 
 /**
  * :: DeveloperApi ::
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
 case class ReceiverInfo(
     streamId: Int,
     name: String,
-    private[streaming] val actor: ActorRef,
+    private[streaming] val endpoint: RpcEndpointRef,
     active: Boolean,
     location: String,
     lastErrorMessage: String = "",

http://git-wip-us.apache.org/repos/asf/spark/blob/d8e1b7b0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 9890047..c4ead6f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.streaming.scheduler
 
-
 import scala.collection.mutable.{HashMap, SynchronizedMap}
 import scala.language.existentials
 
-import akka.actor._
-
 import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
+import org.apache.spark.rpc._
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}
 
@@ -36,7 +34,7 @@ private[streaming] case class RegisterReceiver(
     streamId: Int,
     typ: String,
     host: String,
-    receiverActor: ActorRef
+    receiverEndpoint: RpcEndpointRef
   ) extends ReceiverTrackerMessage
 private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
   extends ReceiverTrackerMessage
@@ -67,19 +65,19 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   )
   private val listenerBus = ssc.scheduler.listenerBus
 
-  // actor is created when generator starts.
+  // endpoint is created when generator starts.
   // This not being null means the tracker has been started and not stopped
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
 
-  /** Start the actor and receiver execution thread. */
+  /** Start the endpoint and receiver execution thread. */
   def start(): Unit = synchronized {
-    if (actor != null) {
+    if (endpoint != null) {
       throw new SparkException("ReceiverTracker already started")
     }
 
     if (!receiverInputStreams.isEmpty) {
-      actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
-        "ReceiverTracker")
+      endpoint = ssc.env.rpcEnv.setupEndpoint(
+        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
       if (!skipReceiverLaunch) receiverExecutor.start()
       logInfo("ReceiverTracker started")
     }
@@ -87,13 +85,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
 
   /** Stop the receiver execution thread. */
   def stop(graceful: Boolean): Unit = synchronized {
-    if (!receiverInputStreams.isEmpty && actor != null) {
+    if (!receiverInputStreams.isEmpty && endpoint != null) {
       // First, stop the receivers
       if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
 
-      // Finally, stop the actor
-      ssc.env.actorSystem.stop(actor)
-      actor = null
+      // Finally, stop the endpoint
+      ssc.env.rpcEnv.stop(endpoint)
+      endpoint = null
       receivedBlockTracker.stop()
       logInfo("ReceiverTracker stopped")
     }
@@ -129,8 +127,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     // Signal the receivers to delete old block data
     if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
       logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
-      receiverInfo.values.flatMap { info => Option(info.actor) }
-        .foreach { _ ! CleanupOldBlocks(cleanupThreshTime) }
+      receiverInfo.values.flatMap { info => Option(info.endpoint) }
+        .foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) }
     }
   }
 
@@ -139,23 +137,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       streamId: Int,
       typ: String,
       host: String,
-      receiverActor: ActorRef,
-      sender: ActorRef
+      receiverEndpoint: RpcEndpointRef,
+      senderAddress: RpcAddress
     ) {
     if (!receiverInputStreamIds.contains(streamId)) {
       throw new SparkException("Register received for unexpected id " + streamId)
     }
     receiverInfo(streamId) = ReceiverInfo(
-      streamId, s"${typ}-${streamId}", receiverActor, true, host)
+      streamId, s"${typ}-${streamId}", receiverEndpoint, true, host)
     listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
-    logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
+    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
   }
 
   /** Deregister a receiver */
   private def deregisterReceiver(streamId: Int, message: String, error: String) {
     val newReceiverInfo = receiverInfo.get(streamId) match {
       case Some(oldInfo) =>
-        oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error)
+        oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error)
       case None =>
         logWarning("No prior receiver info")
         ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
@@ -199,19 +197,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     receivedBlockTracker.hasUnallocatedReceivedBlocks
   }
 
-  /** Actor to receive messages from the receivers. */
-  private class ReceiverTrackerActor extends Actor {
+  /** RpcEndpoint to receive messages from the receivers. */
+  private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
+
     override def receive: PartialFunction[Any, Unit] = {
-      case RegisterReceiver(streamId, typ, host, receiverActor) =>
-        registerReceiver(streamId, typ, host, receiverActor, sender)
-        sender ! true
-      case AddBlock(receivedBlockInfo) =>
-        sender ! addBlock(receivedBlockInfo)
       case ReportError(streamId, message, error) =>
         reportError(streamId, message, error)
+    }
+
+    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+      case RegisterReceiver(streamId, typ, host, receiverEndpoint) =>
+        registerReceiver(streamId, typ, host, receiverEndpoint, context.sender.address)
+        context.reply(true)
+      case AddBlock(receivedBlockInfo) =>
+        context.reply(addBlock(receivedBlockInfo))
       case DeregisterReceiver(streamId, message, error) =>
         deregisterReceiver(streamId, message, error)
-        sender ! true
+        context.reply(true)
     }
   }
 
@@ -314,8 +316,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     /** Stops the receivers. */
     private def stopReceivers() {
       // Signal the receivers to stop
-      receiverInfo.values.flatMap { info => Option(info.actor)}
-                         .foreach { _ ! StopReceiver }
+      receiverInfo.values.flatMap { info => Option(info.endpoint)}
+                         .foreach { _.send(StopReceiver) }
       logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org