You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/02/25 21:33:24 UTC

spark git commit: [SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever

Repository: spark
Updated Branches:
  refs/heads/master 751724b13 -> fb8bb0476


[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever

Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate.

Author: Lin Zhao <li...@exabeam.com>

Closes #11176 from lin-zhao/SPARK-13069.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb8bb047
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb8bb047
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb8bb047

Branch: refs/heads/master
Commit: fb8bb04766005e8935607069c0155d639f407e8a
Parents: 751724b
Author: Lin Zhao <li...@exabeam.com>
Authored: Thu Feb 25 12:32:17 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Feb 25 12:32:24 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/akka/ActorReceiver.scala    | 39 +++++++++++++++++++-
 .../streaming/akka/JavaAkkaUtilsSuite.java      |  2 +
 .../spark/streaming/akka/AkkaUtilsSuite.scala   |  3 ++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index c75dc92..33415c1 100644
--- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import akka.actor._
 import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import akka.pattern.ask
+import akka.util.Timeout
 import com.typesafe.config.ConfigFactory
 
 import org.apache.spark.{Logging, TaskContext}
@@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor {
   }
 
   /**
-   * Store a single item of received data to Spark's memory.
+   * Store a single item of received data to Spark's memory asynchronously.
    * These single items will be aggregated together into data blocks before
    * being pushed into Spark's memory.
    */
   def store[T](item: T) {
     context.parent ! SingleItemData(item)
   }
+
+  /**
+   * Store a single item of received data to Spark's memory and returns a `Future`.
+   * The `Future` will be completed when the operator finishes, or with an
+   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   *
+   * This method allows the user to control the flow speed using `Future`
+   */
+  def store[T](item: T, timeout: Timeout): Future[Unit] = {
+    context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+  }
 }
 
 /**
@@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor {
   def store[T](item: T) {
     context.parent ! SingleItemData(item)
   }
+
+  /**
+   * Store a single item of received data to Spark's memory and returns a `Future`.
+   * The `Future` will be completed when the operator finishes, or with an
+   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   *
+   * This method allows the user to control the flow speed using `Future`
+   */
+  def store[T](item: T, timeout: Timeout): Future[Unit] = {
+    context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+  }
 }
 
 /**
@@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int,
 /** Case class to receive data sent by child actors */
 private[akka] sealed trait ActorReceiverData
 private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData
 private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
 private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] object Ack extends ActorReceiverData
 
 /**
  * Provides Actors as receivers for receiving stream.
@@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag](
         store(msg.asInstanceOf[T])
         n.incrementAndGet
 
+      case AskStoreSingleItemData(msg) =>
+        logDebug("received single sync")
+        store(msg.asInstanceOf[T])
+        n.incrementAndGet
+        sender() ! Ack
+
       case ByteBufferData(bytes) =>
         logDebug("received bytes")
         store(bytes)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
index b732506..ac5ef31 100644
--- a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
+++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
+import akka.util.Timeout;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.Test;
@@ -62,5 +63,6 @@ class JavaTestActor extends JavaActorReceiver {
   @Override
   public void onReceive(Object message) throws Exception {
     store((String) message);
+    store((String) message, new Timeout(1000));
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
index f437585..ce95d9d 100644
--- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
+++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.akka
 
+import scala.concurrent.duration._
+
 import akka.actor.{Props, SupervisorStrategy}
 
 import org.apache.spark.SparkFunSuite
@@ -60,5 +62,6 @@ class AkkaUtilsSuite extends SparkFunSuite {
 class TestActor extends ActorReceiver {
   override def receive: Receive = {
     case m: String => store(m)
+    case m => store(m, 10.seconds)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org