You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/01/20 22:55:48 UTC

spark git commit: [SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project

Repository: spark
Updated Branches:
  refs/heads/master 944fdadf7 -> b7d74a602


[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project

Include the following changes:

1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly

Author: Shixiong Zhu <sh...@databricks.com>

Closes #10744 from zsxwing/streaming-akka-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7d74a60
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7d74a60
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7d74a60

Branch: refs/heads/master
Commit: b7d74a602f622d8e105b349bd6d17ba42e7668dc
Parents: 944fdad
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Jan 20 13:55:41 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jan 20 13:55:41 2016 -0800

----------------------------------------------------------------------
 dev/sparktestsupport/modules.py                 |  12 +
 docs/streaming-custom-receivers.md              |  49 +++-
 docs/streaming-programming-guide.md             |   4 +-
 examples/pom.xml                                |   5 +
 .../examples/streaming/JavaActorWordCount.java  |  14 +-
 .../examples/streaming/ActorWordCount.scala     |  37 +--
 .../examples/streaming/ZeroMQWordCount.scala    |  13 +-
 external/akka/pom.xml                           |  73 +++++
 .../spark/streaming/akka/ActorReceiver.scala    | 269 +++++++++++++++++++
 .../apache/spark/streaming/akka/AkkaUtils.scala | 147 ++++++++++
 .../streaming/akka/JavaAkkaUtilsSuite.java      |  66 +++++
 .../spark/streaming/akka/AkkaUtilsSuite.scala   |  64 +++++
 external/zeromq/pom.xml                         |   5 +
 .../spark/streaming/zeromq/ZeroMQReceiver.scala |   2 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |  76 ++++--
 .../streaming/zeromq/JavaZeroMQStreamSuite.java |  31 ++-
 .../streaming/zeromq/ZeroMQStreamSuite.scala    |  16 +-
 pom.xml                                         |   1 +
 project/MimaExcludes.scala                      |  10 +
 project/SparkBuild.scala                        |   9 +-
 .../spark/streaming/StreamingContext.scala      |  24 +-
 .../api/java/JavaStreamingContext.scala         |  64 -----
 .../streaming/receiver/ActorReceiver.scala      | 245 -----------------
 23 files changed, 826 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 93a8c15..efe58ea 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -222,6 +222,18 @@ streaming_flume_sink = Module(
 )
 
 
+streaming_akka = Module(
+    name="streaming-akka",
+    dependencies=[streaming],
+    source_file_regexes=[
+        "external/akka",
+    ],
+    sbt_test_goals=[
+        "streaming-akka/test",
+    ]
+)
+
+
 streaming_flume = Module(
     name="streaming-flume",
     dependencies=[streaming],

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/docs/streaming-custom-receivers.md
----------------------------------------------------------------------
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 97db865..95b9986 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -257,25 +257,54 @@ The following table summarizes the characteristics of both types of receivers
 
 ## Implementing and Using a Custom Actor-based Receiver
 
+<div class="codetabs">
+<div data-lang="scala"  markdown="1" >
+
 Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
-receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
-trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using
- `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
+receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
+allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
 
 {% highlight scala %}
-class CustomActor extends Actor with ActorHelper {
+
+class CustomActor extends ActorReceiver {
   def receive = {
     case data: String => store(data)
   }
 }
+
+// A new input stream can be created with this custom actor as
+val ssc: StreamingContext = ...
+val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
+
 {% endhighlight %}
 
-And a new input stream can be created with this custom actor as
+See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
+</div>
+<div data-lang="java" markdown="1">
+
+Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to
+receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
+allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+{% highlight java %}
+
+class CustomActor extends JavaActorReceiver {
+  @Override
+  public void onReceive(Object msg) throws Exception {
+    store((String) msg);
+  }
+}
+
+// A new input stream can be created with this custom actor as
+JavaStreamingContext jssc = ...;
+JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
 
-{% highlight scala %}
-val ssc: StreamingContext = ...
-val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver")
 {% endhighlight %}
 
-See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
-for an end-to-end example.
+See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
+</div>
+</div>
+
+<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API.

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 8fd075d..93c34ef 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -659,11 +659,11 @@ methods for creating DStreams from files and Akka actors as input sources.
 	<span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only	`textFileStream` is	available.
 
 - **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
-  actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
+  actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver
   Guide](streaming-custom-receivers.html) for more details.
 
   <span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala
-  libraries, `actorStream` is not available in the Python API.
+  libraries, `AkkaUtils.createStream` is not available in the Python API.
 
 - **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 1a0d5e5..9437cee 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -77,6 +77,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 2377207..62e5633 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.JavaActorReceiver;
+import org.apache.spark.streaming.akka.AkkaUtils;
+import org.apache.spark.streaming.akka.JavaActorReceiver;
 
 /**
  * A sample actor as receiver, is also simplest. This receiver actor
@@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
     remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
   }
 
+  @Override
   public void onReceive(Object msg) throws Exception {
     store((T) msg);
   }
@@ -100,18 +102,20 @@ public class JavaActorWordCount {
     String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
 
     /*
-     * Following is the use of actorStream to plug in custom actor as receiver
+     * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
      *
      * An important point to note:
      * Since Actor may exist outside the spark framework, It is thus user's responsibility
      * to ensure the type safety, i.e type of data received and InputDstream
      * should be same.
      *
-     * For example: Both actorStream and JavaSampleActorReceiver are parameterized
+     * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
      * to same type to ensure type safety.
      */
-    JavaDStream<String> lines = jssc.actorStream(
-        Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
+    JavaDStream<String> lines = AkkaUtils.createStream(
+        jssc,
+        Props.create(JavaSampleActorReceiver.class, feederActorURI),
+        "SampleReceiver");
 
     // compute wordcount
     lines.flatMap(new FlatMapFunction<String, String>() {

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index 88cdc6b..8e88987 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
 import scala.reflect.ClassTag
 import scala.util.Random
 
-import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
+import akka.actor._
+import com.typesafe.config.ConfigFactory
 
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.receiver.ActorReceiver
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
 
 case class SubscribeReceiver(receiverActor: ActorRef)
 case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -78,8 +78,7 @@ class FeederActor extends Actor {
  *
  * @see [[org.apache.spark.examples.streaming.FeederActor]]
  */
-class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends ActorReceiver {
+class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
 
   lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
 
@@ -108,9 +107,13 @@ object FeederActor {
     }
     val Seq(host, port) = args.toSeq
 
-    val conf = new SparkConf
-    val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
-      securityManager = new SecurityManager(conf))._1
+    val akkaConf = ConfigFactory.parseString(
+      s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+         |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+         |akka.remote.netty.tcp.hostname = "$host"
+         |akka.remote.netty.tcp.port = $port
+         |""".stripMargin)
+       val actorSystem = ActorSystem("test", akkaConf)
     val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
 
     println("Feeder started as:" + feeder)
@@ -121,6 +124,7 @@ object FeederActor {
 
 /**
  * A sample word count program demonstrating the use of plugging in
+ *
  * Actor as Receiver
  * Usage: ActorWordCount <hostname> <port>
  *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
@@ -146,20 +150,21 @@ object ActorWordCount {
     val ssc = new StreamingContext(sparkConf, Seconds(2))
 
     /*
-     * Following is the use of actorStream to plug in custom actor as receiver
+     * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
      *
      * An important point to note:
      * Since Actor may exist outside the spark framework, It is thus user's responsibility
-     * to ensure the type safety, i.e type of data received and InputDstream
+     * to ensure the type safety, i.e type of data received and InputDStream
      * should be same.
      *
-     * For example: Both actorStream and SampleActorReceiver are parameterized
+     * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
      * to same type to ensure type safety.
      */
-
-    val lines = ssc.actorStream[String](
-      Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
-        host, port.toInt))), "SampleReceiver")
+    val lines = AkkaUtils.createStream[String](
+      ssc,
+      Props(classOf[SampleActorReceiver[String]],
+        "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
+      "SampleReceiver")
 
     // compute wordcount
     lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 9644890..f612e50 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
 import akka.util.ByteString
 import akka.zeromq._
 import akka.zeromq.Subscribe
+import com.typesafe.config.ConfigFactory
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.zeromq._
 
@@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
  *
  * To run this example locally, you may run publisher as
  *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ *      org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
  * and run the example as
  *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
+ *      org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
  */
 // scalastyle:on
 object ZeroMQWordCount {
@@ -90,7 +91,11 @@ object ZeroMQWordCount {
     def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
 
     // For this stream, a zeroMQ publisher should be running.
-    val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
+    val lines = ZeroMQUtils.createStream(
+      ssc,
+      url,
+      Subscribe(topic),
+      bytesToStringIterator _)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
     wordCounts.print()

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/akka/pom.xml
----------------------------------------------------------------------
diff --git a/external/akka/pom.xml b/external/akka/pom.xml
new file mode 100644
index 0000000..34de9ba
--- /dev/null
+++ b/external/akka/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.10</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-akka_2.10</artifactId>
+  <properties>
+    <sbt.project.name>streaming-akka</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Project External Akka</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${akka.group}</groupId>
+      <artifactId>akka-actor_${scala.binary.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${akka.group}</groupId>
+      <artifactId>akka-remote_${scala.binary.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
new file mode 100644
index 0000000..c75dc92
--- /dev/null
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import akka.actor._
+import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import com.typesafe.config.ConfigFactory
+
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * :: DeveloperApi ::
+ * A helper with set of defaults for supervisor strategy
+ */
+@DeveloperApi
+object ActorReceiver {
+
+  /**
+   * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
+   * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
+   * others, it just escalates the failure to the supervisor of the supervisor.
+   */
+  val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+    15 millis) {
+    case _: RuntimeException => Restart
+    case _: Exception => Escalate
+  }
+
+  /**
+   * A default ActorSystem creator. It will use a unique system name
+   * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
+   * communication.
+   */
+  val defaultActorSystemCreator: () => ActorSystem = () => {
+    val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
+    val akkaConf = ConfigFactory.parseString(
+      s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+         |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+         |""".stripMargin)
+    ActorSystem(uniqueSystemName, akkaConf)
+  }
+}
+
+/**
+ * :: DeveloperApi ::
+ * A base Actor that provides APIs for pushing received data into Spark Streaming for processing.
+ *
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ *
+ * @example {{{
+ *  class MyActor extends ActorReceiver {
+ *      def receive {
+ *          case anything: String => store(anything)
+ *      }
+ *  }
+ *
+ *  AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
+ *
+ * }}}
+ *
+ * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
+ *       to ensure the type safety, i.e. parametrized type of push block and InputDStream
+ *       should be same.
+ */
+@DeveloperApi
+abstract class ActorReceiver extends Actor {
+
+  /** Store an iterator of received data as a data block into Spark's memory. */
+  def store[T](iter: Iterator[T]) {
+    context.parent ! IteratorData(iter)
+  }
+
+  /**
+   * Store the bytes of received data as a data block into Spark's memory. Note
+   * that the data in the ByteBuffer must be serialized using the same serializer
+   * that Spark is configured to use.
+   */
+  def store(bytes: ByteBuffer) {
+    context.parent ! ByteBufferData(bytes)
+  }
+
+  /**
+   * Store a single item of received data to Spark's memory.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   */
+  def store[T](item: T) {
+    context.parent ! SingleItemData(item)
+  }
+}
+
+/**
+ * :: DeveloperApi ::
+ * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for
+ * processing.
+ *
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ *
+ * @example {{{
+ *  class MyActor extends JavaActorReceiver {
+ *      @Override
+ *      public void onReceive(Object msg) throws Exception {
+ *          store((String) msg);
+ *      }
+ *  }
+ *
+ *  AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
+ *
+ * }}}
+ *
+ * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
+ *       to ensure the type safety, i.e. parametrized type of push block and InputDStream
+ *       should be same.
+ */
+@DeveloperApi
+abstract class JavaActorReceiver extends UntypedActor {
+
+  /** Store an iterator of received data as a data block into Spark's memory. */
+  def store[T](iter: Iterator[T]) {
+    context.parent ! IteratorData(iter)
+  }
+
+  /**
+   * Store the bytes of received data as a data block into Spark's memory. Note
+   * that the data in the ByteBuffer must be serialized using the same serializer
+   * that Spark is configured to use.
+   */
+  def store(bytes: ByteBuffer) {
+    context.parent ! ByteBufferData(bytes)
+  }
+
+  /**
+   * Store a single item of received data to Spark's memory.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   */
+  def store[T](item: T) {
+    context.parent ! SingleItemData(item)
+  }
+}
+
+/**
+ * :: DeveloperApi ::
+ * Statistics for querying the supervisor about state of workers. Used in
+ * conjunction with `AkkaUtils.createStream` and
+ * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
+ */
+@DeveloperApi
+case class Statistics(numberOfMsgs: Int,
+  numberOfWorkers: Int,
+  numberOfHiccups: Int,
+  otherInfo: String)
+
+/** Case class to receive data sent by child actors */
+private[akka] sealed trait ActorReceiverData
+private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
+private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * their own Actor to run as receiver for Spark Streaming input source.
+ *
+ * This starts a supervisor actor which starts workers and also provides
+ * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
+ *
+ * Here's a way to start more supervisor/workers as its children.
+ *
+ * @example {{{
+ *  context.parent ! Props(new Supervisor)
+ * }}} OR {{{
+ *  context.parent ! Props(new Worker, "Worker")
+ * }}}
+ */
+private[akka] class ActorReceiverSupervisor[T: ClassTag](
+    actorSystemCreator: () => ActorSystem,
+    props: Props,
+    name: String,
+    storageLevel: StorageLevel,
+    receiverSupervisorStrategy: SupervisorStrategy
+  ) extends Receiver[T](storageLevel) with Logging {
+
+  private lazy val actorSystem = actorSystemCreator()
+  protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
+    "Supervisor" + streamId)
+
+  class Supervisor extends Actor {
+
+    override val supervisorStrategy = receiverSupervisorStrategy
+    private val worker = context.actorOf(props, name)
+    logInfo("Started receiver worker at:" + worker.path)
+
+    private val n: AtomicInteger = new AtomicInteger(0)
+    private val hiccups: AtomicInteger = new AtomicInteger(0)
+
+    override def receive: PartialFunction[Any, Unit] = {
+
+      case IteratorData(iterator) =>
+        logDebug("received iterator")
+        store(iterator.asInstanceOf[Iterator[T]])
+
+      case SingleItemData(msg) =>
+        logDebug("received single")
+        store(msg.asInstanceOf[T])
+        n.incrementAndGet
+
+      case ByteBufferData(bytes) =>
+        logDebug("received bytes")
+        store(bytes)
+
+      case props: Props =>
+        val worker = context.actorOf(props)
+        logInfo("Started receiver worker at:" + worker.path)
+        sender ! worker
+
+      case (props: Props, name: String) =>
+        val worker = context.actorOf(props, name)
+        logInfo("Started receiver worker at:" + worker.path)
+        sender ! worker
+
+      case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+      case _: Statistics =>
+        val workers = context.children
+        sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+    }
+  }
+
+  def onStart(): Unit = {
+    actorSupervisor
+    logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path)
+  }
+
+  def onStop(): Unit = {
+    actorSupervisor ! PoisonPill
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
new file mode 100644
index 0000000..38c35c5
--- /dev/null
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import scala.reflect.ClassTag
+
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
+
+import org.apache.spark.api.java.function.{Function0 => JFunction0}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object AkkaUtils {
+
+  /**
+   * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
+   *
+   * @param ssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping (default:
+   *                           ActorReceiver.defaultActorSystemCreator)
+   * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T: ClassTag](
+      ssc: StreamingContext,
+      propsForActor: Props,
+      actorName: String,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+      actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+      supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
+    ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
+    val cleanF = ssc.sc.clean(actorSystemCreator)
+    ssc.receiverStream(new ActorReceiverSupervisor[T](
+      cleanF,
+      propsForActor,
+      actorName,
+      storageLevel,
+      supervisorStrategy))
+  }
+
+  /**
+   * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+   *
+   * @param jssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   * @param storageLevel Storage level to use for storing the received objects
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping.
+   * @param supervisorStrategy the supervisor strategy
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      propsForActor: Props,
+      actorName: String,
+      storageLevel: StorageLevel,
+      actorSystemCreator: JFunction0[ActorSystem],
+      supervisorStrategy: SupervisorStrategy
+    ): JavaReceiverInputDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    createStream[T](
+      jssc.ssc,
+      propsForActor,
+      actorName,
+      storageLevel,
+      () => actorSystemCreator.call(),
+      supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+   *
+   * @param jssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      propsForActor: Props,
+      actorName: String,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
+  }
+
+  /**
+   * Create an input stream with a user-defined actor. Storage level of the data will be the default
+   * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
+   *
+   * @param jssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      propsForActor: Props,
+      actorName: String
+    ): JavaReceiverInputDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    createStream[T](jssc.ssc, propsForActor, actorName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
new file mode 100644
index 0000000..b732506
--- /dev/null
+++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.Function0;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+
+public class JavaAkkaUtilsSuite {
+
+  @Test // tests the API, does not actually test data receiving
+  public void testAkkaUtils() {
+    JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+    try {
+      JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
+        jsc, Props.create(JavaTestActor.class), "test");
+      JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
+        jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
+      JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
+        jsc,
+        Props.create(JavaTestActor.class),
+        "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
+        new ActorSystemCreatorForTest(),
+        SupervisorStrategy.defaultStrategy());
+    } finally {
+      jsc.stop();
+    }
+  }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+  @Override
+  public ActorSystem call() {
+    return null;
+  }
+}
+
+
+class JavaTestActor extends JavaActorReceiver {
+  @Override
+  public void onReceive(Object message) throws Exception {
+    store((String) message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
new file mode 100644
index 0000000..f437585
--- /dev/null
+++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import akka.actor.{Props, SupervisorStrategy}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+class AkkaUtilsSuite extends SparkFunSuite {
+
+  test("createStream") {
+    val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
+    try {
+      // tests the API, does not actually test data receiving
+      val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test")
+      val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
+      val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc,
+        Props[TestActor](),
+        "test",
+        StorageLevel.MEMORY_AND_DISK_SER_2,
+        supervisorStrategy = SupervisorStrategy.defaultStrategy)
+      val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+      val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+      val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc,
+        Props[TestActor](),
+        "test",
+        StorageLevel.MEMORY_AND_DISK_SER_2,
+        () => null,
+        SupervisorStrategy.defaultStrategy)
+    } finally {
+      ssc.stop()
+    }
+  }
+}
+
+class TestActor extends ActorReceiver {
+  override def receive: Receive = {
+    case m: String => store(m)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index a725988..7781aae 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -43,6 +43,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 506ba87..dd367cd 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -23,7 +23,7 @@ import akka.util.ByteString
 import akka.zeromq._
 
 import org.apache.spark.Logging
-import org.apache.spark.streaming.receiver.ActorReceiver
+import org.apache.spark.streaming.akka.ActorReceiver
 
 /**
  * A receiver to subscribe to ZeroMQ stream.

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 63cd8a2..1784d6e 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -20,29 +20,33 @@ package org.apache.spark.streaming.zeromq
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-import akka.actor.{Props, SupervisorStrategy}
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
 import akka.util.ByteString
 import akka.zeromq.Subscribe
 
-import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
 import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
 
 object ZeroMQUtils {
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param ssc            StreamingContext object
-   * @param publisherUrl   Url of remote zeromq publisher
-   * @param subscribe      Topic to subscribe to
+   * @param ssc StreamingContext object
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
    *                       and each frame has sequence of byte thus it needs the converter
    *                       (which might be deserializer of bytes) to translate from sequence
    *                       of sequence of bytes, where sequence refer to a frame
    *                       and sub sequence refer to its payload.
    * @param storageLevel   RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping (default:
+   *                           ActorReceiver.defaultActorSystemCreator)
+   * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
    */
   def createStream[T: ClassTag](
       ssc: StreamingContext,
@@ -50,22 +54,31 @@ object ZeroMQUtils {
       subscribe: Subscribe,
       bytesToObjects: Seq[ByteString] => Iterator[T],
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+      actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+      supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
     ): ReceiverInputDStream[T] = {
-    ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
-      "ZeroMQReceiver", storageLevel, supervisorStrategy)
+    AkkaUtils.createStream(
+      ssc,
+      Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+      "ZeroMQReceiver",
+      storageLevel,
+      actorSystemCreator,
+      supervisorStrategy)
   }
 
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param jssc           JavaStreamingContext object
-   * @param publisherUrl   Url of remote ZeroMQ publisher
-   * @param subscribe      Topic to subscribe to
+   * @param jssc JavaStreamingContext object
+   * @param publisherUrl Url of remote ZeroMQ publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
    *                       frame has sequence of byte thus it needs the converter(which might be
    *                       deserializer of bytes) to translate from sequence of sequence of bytes,
    *                       where sequence refer to a frame and sub sequence refer to its payload.
-   * @param storageLevel  Storage level to use for storing the received objects
+   * @param storageLevel Storage level to use for storing the received objects
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping.
+   * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
    */
   def createStream[T](
       jssc: JavaStreamingContext,
@@ -73,25 +86,33 @@ object ZeroMQUtils {
       subscribe: Subscribe,
       bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
       storageLevel: StorageLevel,
+      actorSystemCreator: JFunction0[ActorSystem],
       supervisorStrategy: SupervisorStrategy
     ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn =
       (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+    createStream[T](
+      jssc.ssc,
+      publisherUrl,
+      subscribe,
+      fn,
+      storageLevel,
+      () => actorSystemCreator.call(),
+      supervisorStrategy)
   }
 
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param jssc           JavaStreamingContext object
-   * @param publisherUrl   Url of remote zeromq publisher
-   * @param subscribe      Topic to subscribe to
+   * @param jssc JavaStreamingContext object
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
    *                       frame has sequence of byte thus it needs the converter(which might be
    *                       deserializer of bytes) to translate from sequence of sequence of bytes,
    *                       where sequence refer to a frame and sub sequence refer to its payload.
-   * @param storageLevel   RDD storage level.
+   * @param storageLevel RDD storage level.
    */
   def createStream[T](
       jssc: JavaStreamingContext,
@@ -104,14 +125,19 @@ object ZeroMQUtils {
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn =
       (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+    createStream[T](
+      jssc.ssc,
+      publisherUrl,
+      subscribe,
+      fn,
+      storageLevel)
   }
 
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param jssc           JavaStreamingContext object
-   * @param publisherUrl   Url of remote zeromq publisher
-   * @param subscribe      Topic to subscribe to
+   * @param jssc JavaStreamingContext object
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
    *                       frame has sequence of byte thus it needs the converter(which might
    *                       be deserializer of bytes) to translate from sequence of sequence of
@@ -128,6 +154,10 @@ object ZeroMQUtils {
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn =
       (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+    createStream[T](
+      jssc.ssc,
+      publisherUrl,
+      subscribe,
+      fn)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 417b91e..9ff4b41 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,14 +17,17 @@
 
 package org.apache.spark.streaming.zeromq;
 
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
+import akka.actor.ActorSystem;
 import akka.actor.SupervisorStrategy;
 import akka.util.ByteString;
 import akka.zeromq.Subscribe;
+import org.junit.Test;
+
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function0;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 
 public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
 
@@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
   public void testZeroMQStream() {
     String publishUrl = "abc";
     Subscribe subscribe = new Subscribe((ByteString)null);
-    Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
-      @Override
-      public Iterable<String> call(byte[][] bytes) throws Exception {
-        return null;
-      }
-    };
+    Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
+    Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
 
     JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
       ssc, publishUrl, subscribe, bytesToObjects);
     JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
       ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
     JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
-      ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
       SupervisorStrategy.defaultStrategy());
   }
 }
+
+class BytesToObjects implements Function<byte[][], Iterable<String>> {
+  @Override
+  public Iterable<String> call(byte[][] bytes) throws Exception {
+    return null;
+  }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+  @Override
+  public ActorSystem call() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 35d2e62..bac2679 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite {
 
     // tests the API, does not actually test data receiving
     val test1: ReceiverInputDStream[String] =
-      ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+      ZeroMQUtils.createStream(
+        ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
     val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
-      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
     val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
       ssc, publishUrl, subscribe, bytesToObjects,
-      StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+      StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
+    val test4: ReceiverInputDStream[String] =
+      ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+    val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+      ssc, publishUrl, subscribe, bytesToObjects,
+      StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
 
-    // TODO: Actually test data receiving
+    // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
     ssc.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fca6269..43f08ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
     <module>external/flume</module>
     <module>external/flume-sink</module>
     <module>external/flume-assembly</module>
+    <module>external/akka</module>
     <module>external/mqtt</module>
     <module>external/mqtt-assembly</module>
     <module>external/zeromq</module>

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 6469201..905fb4c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -154,6 +154,16 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
       ) ++ Seq(
+        // SPARK-7799 Add "streaming-akka" project
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$6"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$5"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$4"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$3"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream")
+      ) ++ Seq(
         // SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus")

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 06e561a..3927b88 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -35,11 +35,11 @@ object BuildCommons {
   private val buildLocation = file(".").getAbsoluteFile.getParentFile
 
   val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
-    sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
+    sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka,
     streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) =
     Seq("catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
       "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
-      "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
+      "streaming-flume", "streaming-akka", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
       "streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _))
 
   val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl,
@@ -232,8 +232,9 @@ object SparkBuild extends PomBuild {
   /* Enable tests settings for all projects except examples, assembly and tools */
   (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
 
+  // TODO: remove streamingAkka from this list after 2.0.0
   allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl,
-    networkCommon, networkShuffle, networkYarn, unsafe, testTags).contains(x)).foreach {
+    networkCommon, networkShuffle, networkYarn, unsafe, streamingAkka, testTags).contains(x)).foreach {
       x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
     }
 
@@ -649,7 +650,7 @@ object Unidoc {
       "-public",
       "-group", "Core Java API", packageList("api.java", "api.java.function"),
       "-group", "Spark Streaming", packageList(
-        "streaming.api.java", "streaming.flume", "streaming.kafka",
+        "streaming.api.java", "streaming.flume", "streaming.akka", "streaming.kafka",
         "streaming.mqtt", "streaming.twitter", "streaming.zeromq", "streaming.kinesis"
       ),
       "-group", "MLlib", packageList(

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b7070dd..ec57c05 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.Queue
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import akka.actor.{Props, SupervisorStrategy}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -42,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContextState._
 import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver}
+import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
 import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
@@ -296,27 +295,6 @@ class StreamingContext private[streaming] (
   }
 
   /**
-   * Create an input stream with any arbitrary user implemented actor receiver.
-   * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
-   * @param props Props object defining creation of the actor
-   * @param name Name of the actor
-   * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e parametrized type of data received and actorStream
-   *       should be same.
-   */
-  def actorStream[T: ClassTag](
-      props: Props,
-      name: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
-    ): ReceiverInputDStream[T] = withNamedScope("actor stream") {
-    receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy))
-  }
-
-  /**
    * Create a input stream from TCP source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
    * lines.

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 00f9d8a..7a25ce5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -24,7 +24,6 @@ import java.util.{List => JList, Map => JMap}
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-import akka.actor.{Props, SupervisorStrategy}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@@ -357,69 +356,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
   }
 
   /**
-   * Create an input stream with any arbitrary user implemented actor receiver.
-   * @param props Props object defining creation of the actor
-   * @param name Name of the actor
-   * @param storageLevel Storage level to use for storing the received objects
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e parametrized type of data received and actorStream
-   *       should be same.
-   */
-  def actorStream[T](
-      props: Props,
-      name: String,
-      storageLevel: StorageLevel,
-      supervisorStrategy: SupervisorStrategy
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
-  }
-
-  /**
-   * Create an input stream with any arbitrary user implemented actor receiver.
-   * @param props Props object defining creation of the actor
-   * @param name Name of the actor
-   * @param storageLevel Storage level to use for storing the received objects
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e parametrized type of data received and actorStream
-   *       should be same.
-   */
-  def actorStream[T](
-      props: Props,
-      name: String,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    ssc.actorStream[T](props, name, storageLevel)
-  }
-
-  /**
-   * Create an input stream with any arbitrary user implemented actor receiver.
-   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param props Props object defining creation of the actor
-   * @param name Name of the actor
-   *
-   * @note An important point to note:
-   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
-   *       to ensure the type safety, i.e parametrized type of data received and actorStream
-   *       should be same.
-   */
-  def actorStream[T](
-      props: Props,
-      name: String
-    ): JavaReceiverInputDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    ssc.actorStream[T](props, name)
-  }
-
-  /**
    * Create an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/b7d74a60/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
deleted file mode 100644
index 0eabf3d..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.receiver
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.reflect.ClassTag
-
-import akka.actor._
-import akka.actor.SupervisorStrategy.{Escalate, Restart}
-
-import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.storage.StorageLevel
-
-/**
- * :: DeveloperApi ::
- * A helper with set of defaults for supervisor strategy
- */
-@DeveloperApi
-object ActorSupervisorStrategy {
-
-  val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
-    15 millis) {
-    case _: RuntimeException => Restart
-    case _: Exception => Escalate
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * A base Actor that provides APIs for pushing received data into Spark Streaming for processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- *  class MyActor extends ActorReceiver {
- *      def receive {
- *          case anything: String => store(anything)
- *      }
- *  }
- *
- *  // Can be used with an actorStream as follows
- *  ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- *       to ensure the type safety, i.e parametrized type of push block and InputDStream
- *       should be same.
- */
-@DeveloperApi
-abstract class ActorReceiver extends Actor {
-
-  /** Store an iterator of received data as a data block into Spark's memory. */
-  def store[T](iter: Iterator[T]) {
-    context.parent ! IteratorData(iter)
-  }
-
-  /**
-   * Store the bytes of received data as a data block into Spark's memory. Note
-   * that the data in the ByteBuffer must be serialized using the same serializer
-   * that Spark is configured to use.
-   */
-  def store(bytes: ByteBuffer) {
-    context.parent ! ByteBufferData(bytes)
-  }
-
-  /**
-   * Store a single item of received data to Spark's memory.
-   * These single items will be aggregated together into data blocks before
-   * being pushed into Spark's memory.
-   */
-  def store[T](item: T) {
-    context.parent ! SingleItemData(item)
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for
- * processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- *  class MyActor extends JavaActorReceiver {
- *      def receive {
- *          case anything: String => store(anything)
- *      }
- *  }
- *
- *  // Can be used with an actorStream as follows
- *  ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- *       to ensure the type safety, i.e parametrized type of push block and InputDStream
- *       should be same.
- */
-@DeveloperApi
-abstract class JavaActorReceiver extends UntypedActor {
-
-  /** Store an iterator of received data as a data block into Spark's memory. */
-  def store[T](iter: Iterator[T]) {
-    context.parent ! IteratorData(iter)
-  }
-
-  /**
-   * Store the bytes of received data as a data block into Spark's memory. Note
-   * that the data in the ByteBuffer must be serialized using the same serializer
-   * that Spark is configured to use.
-   */
-  def store(bytes: ByteBuffer) {
-    context.parent ! ByteBufferData(bytes)
-  }
-
-  /**
-   * Store a single item of received data to Spark's memory.
-   * These single items will be aggregated together into data blocks before
-   * being pushed into Spark's memory.
-   */
-  def store[T](item: T) {
-    context.parent ! SingleItemData(item)
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Statistics for querying the supervisor about state of workers. Used in
- * conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receiver.ActorReceiver]].
- */
-@DeveloperApi
-case class Statistics(numberOfMsgs: Int,
-  numberOfWorkers: Int,
-  numberOfHiccups: Int,
-  otherInfo: String)
-
-/** Case class to receive data sent by child actors */
-private[streaming] sealed trait ActorReceiverData
-private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
-private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
-private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
-
-/**
- * Provides Actors as receivers for receiving stream.
- *
- * As Actors can also be used to receive data from almost any stream source.
- * A nice set of abstraction(s) for actors as receivers is already provided for
- * a few general cases. It is thus exposed as an API where user may come with
- * their own Actor to run as receiver for Spark Streaming input source.
- *
- * This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
- *
- * Here's a way to start more supervisor/workers as its children.
- *
- * @example {{{
- *  context.parent ! Props(new Supervisor)
- * }}} OR {{{
- *  context.parent ! Props(new Worker, "Worker")
- * }}}
- */
-private[streaming] class ActorReceiverSupervisor[T: ClassTag](
-    props: Props,
-    name: String,
-    storageLevel: StorageLevel,
-    receiverSupervisorStrategy: SupervisorStrategy
-  ) extends Receiver[T](storageLevel) with Logging {
-
-  protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
-    "Supervisor" + streamId)
-
-  class Supervisor extends Actor {
-
-    override val supervisorStrategy = receiverSupervisorStrategy
-    private val worker = context.actorOf(props, name)
-    logInfo("Started receiver worker at:" + worker.path)
-
-    private val n: AtomicInteger = new AtomicInteger(0)
-    private val hiccups: AtomicInteger = new AtomicInteger(0)
-
-    override def receive: PartialFunction[Any, Unit] = {
-
-      case IteratorData(iterator) =>
-        logDebug("received iterator")
-        store(iterator.asInstanceOf[Iterator[T]])
-
-      case SingleItemData(msg) =>
-        logDebug("received single")
-        store(msg.asInstanceOf[T])
-        n.incrementAndGet
-
-      case ByteBufferData(bytes) =>
-        logDebug("received bytes")
-        store(bytes)
-
-      case props: Props =>
-        val worker = context.actorOf(props)
-        logInfo("Started receiver worker at:" + worker.path)
-        sender ! worker
-
-      case (props: Props, name: String) =>
-        val worker = context.actorOf(props, name)
-        logInfo("Started receiver worker at:" + worker.path)
-        sender ! worker
-
-      case _: PossiblyHarmful => hiccups.incrementAndGet()
-
-      case _: Statistics =>
-        val workers = context.children
-        sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
-
-    }
-  }
-
-  def onStart(): Unit = {
-    actorSupervisor
-    logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path)
-  }
-
-  def onStop(): Unit = {
-    actorSupervisor ! PoisonPill
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org