You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/06/10 15:24:12 UTC

[33/50] [abbrv] bahir git commit: [SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever

[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever

Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate.

Author: Lin Zhao <li...@exabeam.com>

Closes #11176 from lin-zhao/SPARK-13069.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/6ac5a189
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/6ac5a189
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/6ac5a189

Branch: refs/heads/master
Commit: 6ac5a1893dbab0f44c2723611aa2f1e6cac82a7c
Parents: 5d45050
Author: Lin Zhao <li...@exabeam.com>
Authored: Thu Feb 25 12:32:17 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Feb 25 12:32:24 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/akka/ActorReceiver.scala    | 39 +++++++++++++++++++-
 .../streaming/akka/JavaAkkaUtilsSuite.java      |  2 +
 .../spark/streaming/akka/AkkaUtilsSuite.scala   |  3 ++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/6ac5a189/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index c75dc92..33415c1 100644
--- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import akka.actor._
 import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import akka.pattern.ask
+import akka.util.Timeout
 import com.typesafe.config.ConfigFactory
 
 import org.apache.spark.{Logging, TaskContext}
@@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor {
   }
 
   /**
-   * Store a single item of received data to Spark's memory.
+   * Store a single item of received data to Spark's memory asynchronously.
    * These single items will be aggregated together into data blocks before
    * being pushed into Spark's memory.
    */
   def store[T](item: T) {
     context.parent ! SingleItemData(item)
   }
+
+  /**
+   * Store a single item of received data to Spark's memory and returns a `Future`.
+   * The `Future` will be completed when the operator finishes, or with an
+   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   *
+   * This method allows the user to control the flow speed using `Future`
+   */
+  def store[T](item: T, timeout: Timeout): Future[Unit] = {
+    context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+  }
 }
 
 /**
@@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor {
   def store[T](item: T) {
     context.parent ! SingleItemData(item)
   }
+
+  /**
+   * Store a single item of received data to Spark's memory and returns a `Future`.
+   * The `Future` will be completed when the operator finishes, or with an
+   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   *
+   * This method allows the user to control the flow speed using `Future`
+   */
+  def store[T](item: T, timeout: Timeout): Future[Unit] = {
+    context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+  }
 }
 
 /**
@@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int,
 /** Case class to receive data sent by child actors */
 private[akka] sealed trait ActorReceiverData
 private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData
 private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
 private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] object Ack extends ActorReceiverData
 
 /**
  * Provides Actors as receivers for receiving stream.
@@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag](
         store(msg.asInstanceOf[T])
         n.incrementAndGet
 
+      case AskStoreSingleItemData(msg) =>
+        logDebug("received single sync")
+        store(msg.asInstanceOf[T])
+        n.incrementAndGet
+        sender() ! Ack
+
       case ByteBufferData(bytes) =>
         logDebug("received bytes")
         store(bytes)

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ac5a189/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
index b732506..ac5ef31 100644
--- a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
+++ b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
+import akka.util.Timeout;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.Test;
@@ -62,5 +63,6 @@ class JavaTestActor extends JavaActorReceiver {
   @Override
   public void onReceive(Object message) throws Exception {
     store((String) message);
+    store((String) message, new Timeout(1000));
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ac5a189/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
index f437585..ce95d9d 100644
--- a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
+++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.akka
 
+import scala.concurrent.duration._
+
 import akka.actor.{Props, SupervisorStrategy}
 
 import org.apache.spark.SparkFunSuite
@@ -60,5 +62,6 @@ class AkkaUtilsSuite extends SparkFunSuite {
 class TestActor extends ActorReceiver {
   override def receive: Receive = {
     case m: String => store(m)
+    case m => store(m, 10.seconds)
   }
 }