You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/06/10 15:24:06 UTC

[27/50] [abbrv] bahir git commit: [SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project

[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project

Include the following changes:

1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly

Author: Shixiong Zhu <sh...@databricks.com>

Closes #10744 from zsxwing/streaming-akka-2.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e3591971
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e3591971
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e3591971

Branch: refs/heads/master
Commit: e3591971054e18618fd6ce400efac15411619285
Parents: 618b862
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Jan 20 13:55:41 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jan 20 13:55:41 2016 -0800

----------------------------------------------------------------------
 .../streaming/akka/JavaActorWordCount.java      |  14 +-
 .../streaming/akka/ActorWordCount.scala         |  37 +++--
 streaming-akka/pom.xml                          |  73 +++++++++
 .../spark/streaming/akka/ActorReceiver.scala    |  64 +++++---
 .../apache/spark/streaming/akka/AkkaUtils.scala | 147 +++++++++++++++++++
 .../streaming/akka/JavaAkkaUtilsSuite.java      |  66 +++++++++
 .../spark/streaming/akka/AkkaUtilsSuite.scala   |  64 ++++++++
 .../streaming/zeromq/ZeroMQWordCount.scala      |  13 +-
 streaming-zeromq/pom.xml                        |   5 +
 .../spark/streaming/zeromq/ZeroMQReceiver.scala |   2 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |  76 +++++++---
 .../streaming/zeromq/JavaZeroMQStreamSuite.java |  31 ++--
 .../streaming/zeromq/ZeroMQStreamSuite.scala    |  16 +-
 13 files changed, 526 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
index 2377207..62e5633 100644
--- a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
+++ b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
@@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.JavaActorReceiver;
+import org.apache.spark.streaming.akka.AkkaUtils;
+import org.apache.spark.streaming.akka.JavaActorReceiver;
 
 /**
  * A sample actor as receiver, is also simplest. This receiver actor
@@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
     remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
   }
 
+  @Override
   public void onReceive(Object msg) throws Exception {
     store((T) msg);
   }
@@ -100,18 +102,20 @@ public class JavaActorWordCount {
     String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
 
     /*
-     * Following is the use of actorStream to plug in custom actor as receiver
+     * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
      *
      * An important point to note:
      * Since Actor may exist outside the spark framework, It is thus user's responsibility
      * to ensure the type safety, i.e type of data received and InputDstream
      * should be same.
      *
-     * For example: Both actorStream and JavaSampleActorReceiver are parameterized
+     * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
      * to same type to ensure type safety.
      */
-    JavaDStream<String> lines = jssc.actorStream(
-        Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
+    JavaDStream<String> lines = AkkaUtils.createStream(
+        jssc,
+        Props.create(JavaSampleActorReceiver.class, feederActorURI),
+        "SampleReceiver");
 
     // compute wordcount
     lines.flatMap(new FlatMapFunction<String, String>() {

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
index 88cdc6b..8e88987 100644
--- a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
+++ b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
@@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
 import scala.reflect.ClassTag
 import scala.util.Random
 
-import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
+import akka.actor._
+import com.typesafe.config.ConfigFactory
 
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.receiver.ActorReceiver
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
 
 case class SubscribeReceiver(receiverActor: ActorRef)
 case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -78,8 +78,7 @@ class FeederActor extends Actor {
  *
  * @see [[org.apache.spark.examples.streaming.FeederActor]]
  */
-class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends ActorReceiver {
+class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
 
   lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
 
@@ -108,9 +107,13 @@ object FeederActor {
     }
     val Seq(host, port) = args.toSeq
 
-    val conf = new SparkConf
-    val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
-      securityManager = new SecurityManager(conf))._1
+    val akkaConf = ConfigFactory.parseString(
+      s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+         |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+         |akka.remote.netty.tcp.hostname = "$host"
+         |akka.remote.netty.tcp.port = $port
+         |""".stripMargin)
+       val actorSystem = ActorSystem("test", akkaConf)
     val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
 
     println("Feeder started as:" + feeder)
@@ -121,6 +124,7 @@ object FeederActor {
 
 /**
  * A sample word count program demonstrating the use of plugging in
+ *
  * Actor as Receiver
  * Usage: ActorWordCount <hostname> <port>
  *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
@@ -146,20 +150,21 @@ object ActorWordCount {
     val ssc = new StreamingContext(sparkConf, Seconds(2))
 
     /*
-     * Following is the use of actorStream to plug in custom actor as receiver
+     * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
      *
      * An important point to note:
      * Since Actor may exist outside the spark framework, It is thus user's responsibility
-     * to ensure the type safety, i.e type of data received and InputDstream
+     * to ensure the type safety, i.e type of data received and InputDStream
      * should be same.
      *
-     * For example: Both actorStream and SampleActorReceiver are parameterized
+     * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
      * to same type to ensure type safety.
      */
-
-    val lines = ssc.actorStream[String](
-      Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
-        host, port.toInt))), "SampleReceiver")
+    val lines = AkkaUtils.createStream[String](
+      ssc,
+      Props(classOf[SampleActorReceiver[String]],
+        "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
+      "SampleReceiver")
 
     // compute wordcount
     lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml
new file mode 100644
index 0000000..34de9ba
--- /dev/null
+++ b/streaming-akka/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.10</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-akka_2.10</artifactId>
+  <properties>
+    <sbt.project.name>streaming-akka</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Project External Akka</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${akka.group}</groupId>
+      <artifactId>akka-actor_${scala.binary.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${akka.group}</groupId>
+      <artifactId>akka-remote_${scala.binary.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index 0eabf3d..c75dc92 100644
--- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.streaming.receiver
+package org.apache.spark.streaming.akka
 
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicInteger
@@ -26,23 +26,44 @@ import scala.reflect.ClassTag
 
 import akka.actor._
 import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import com.typesafe.config.ConfigFactory
 
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.{Logging, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
 
 /**
  * :: DeveloperApi ::
  * A helper with set of defaults for supervisor strategy
  */
 @DeveloperApi
-object ActorSupervisorStrategy {
+object ActorReceiver {
 
-  val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+  /**
+   * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
+   * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
+   * others, it just escalates the failure to the supervisor of the supervisor.
+   */
+  val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
     15 millis) {
     case _: RuntimeException => Restart
     case _: Exception => Escalate
   }
+
+  /**
+   * A default ActorSystem creator. It will use a unique system name
+   * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
+   * communication.
+   */
+  val defaultActorSystemCreator: () => ActorSystem = () => {
+    val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
+    val akkaConf = ConfigFactory.parseString(
+      s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+         |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+         |""".stripMargin)
+    ActorSystem(uniqueSystemName, akkaConf)
+  }
 }
 
 /**
@@ -58,13 +79,12 @@ object ActorSupervisorStrategy {
  *      }
  *  }
  *
- *  // Can be used with an actorStream as follows
- *  ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ *  AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
  *
  * }}}
  *
  * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- *       to ensure the type safety, i.e parametrized type of push block and InputDStream
+ *       to ensure the type safety, i.e. parametrized type of push block and InputDStream
  *       should be same.
  */
 @DeveloperApi
@@ -103,18 +123,18 @@ abstract class ActorReceiver extends Actor {
  *
  * @example {{{
  *  class MyActor extends JavaActorReceiver {
- *      def receive {
- *          case anything: String => store(anything)
+ *      @Override
+ *      public void onReceive(Object msg) throws Exception {
+ *          store((String) msg);
  *      }
  *  }
  *
- *  // Can be used with an actorStream as follows
- *  ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ *  AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
  *
  * }}}
  *
  * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- *       to ensure the type safety, i.e parametrized type of push block and InputDStream
+ *       to ensure the type safety, i.e. parametrized type of push block and InputDStream
  *       should be same.
  */
 @DeveloperApi
@@ -147,8 +167,8 @@ abstract class JavaActorReceiver extends UntypedActor {
 /**
  * :: DeveloperApi ::
  * Statistics for querying the supervisor about state of workers. Used in
- * conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receiver.ActorReceiver]].
+ * conjunction with `AkkaUtils.createStream` and
+ * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
  */
 @DeveloperApi
 case class Statistics(numberOfMsgs: Int,
@@ -157,10 +177,10 @@ case class Statistics(numberOfMsgs: Int,
   otherInfo: String)
 
 /** Case class to receive data sent by child actors */
-private[streaming] sealed trait ActorReceiverData
-private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
-private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
-private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] sealed trait ActorReceiverData
+private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
+private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
 
 /**
  * Provides Actors as receivers for receiving stream.
@@ -181,14 +201,16 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
  *  context.parent ! Props(new Worker, "Worker")
  * }}}
  */
-private[streaming] class ActorReceiverSupervisor[T: ClassTag](
+private[akka] class ActorReceiverSupervisor[T: ClassTag](
+    actorSystemCreator: () => ActorSystem,
     props: Props,
     name: String,
     storageLevel: StorageLevel,
     receiverSupervisorStrategy: SupervisorStrategy
   ) extends Receiver[T](storageLevel) with Logging {
 
-  protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
+  private lazy val actorSystem = actorSystemCreator()
+  protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
     "Supervisor" + streamId)
 
   class Supervisor extends Actor {
@@ -241,5 +263,7 @@ private[streaming] class ActorReceiverSupervisor[T: ClassTag](
 
   def onStop(): Unit = {
     actorSupervisor ! PoisonPill
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
new file mode 100644
index 0000000..38c35c5
--- /dev/null
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import scala.reflect.ClassTag
+
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
+
+import org.apache.spark.api.java.function.{Function0 => JFunction0}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object AkkaUtils {
+
+  /**
+   * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
+   *
+   * @param ssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping (default:
+   *                           ActorReceiver.defaultActorSystemCreator)
+   * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T: ClassTag](
+      ssc: StreamingContext,
+      propsForActor: Props,
+      actorName: String,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+      actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+      supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
+    ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
+    val cleanF = ssc.sc.clean(actorSystemCreator)
+    ssc.receiverStream(new ActorReceiverSupervisor[T](
+      cleanF,
+      propsForActor,
+      actorName,
+      storageLevel,
+      supervisorStrategy))
+  }
+
+  /**
+   * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+   *
+   * @param jssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   * @param storageLevel Storage level to use for storing the received objects
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping.
+   * @param supervisorStrategy the supervisor strategy
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      propsForActor: Props,
+      actorName: String,
+      storageLevel: StorageLevel,
+      actorSystemCreator: JFunction0[ActorSystem],
+      supervisorStrategy: SupervisorStrategy
+    ): JavaReceiverInputDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    createStream[T](
+      jssc.ssc,
+      propsForActor,
+      actorName,
+      storageLevel,
+      () => actorSystemCreator.call(),
+      supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+   *
+   * @param jssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      propsForActor: Props,
+      actorName: String,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
+  }
+
+  /**
+   * Create an input stream with a user-defined actor. Storage level of the data will be the default
+   * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
+   *
+   * @param jssc The StreamingContext instance
+   * @param propsForActor Props object defining creation of the actor
+   * @param actorName Name of the actor
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e. parametrized type of data received and createStream
+   *       should be same.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      propsForActor: Props,
+      actorName: String
+    ): JavaReceiverInputDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    createStream[T](jssc.ssc, propsForActor, actorName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
new file mode 100644
index 0000000..b732506
--- /dev/null
+++ b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.Function0;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+
+public class JavaAkkaUtilsSuite {
+
+  @Test // tests the API, does not actually test data receiving
+  public void testAkkaUtils() {
+    JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+    try {
+      JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
+        jsc, Props.create(JavaTestActor.class), "test");
+      JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
+        jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
+      JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
+        jsc,
+        Props.create(JavaTestActor.class),
+        "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
+        new ActorSystemCreatorForTest(),
+        SupervisorStrategy.defaultStrategy());
+    } finally {
+      jsc.stop();
+    }
+  }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+  @Override
+  public ActorSystem call() {
+    return null;
+  }
+}
+
+
+class JavaTestActor extends JavaActorReceiver {
+  @Override
+  public void onReceive(Object message) throws Exception {
+    store((String) message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
new file mode 100644
index 0000000..f437585
--- /dev/null
+++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import akka.actor.{Props, SupervisorStrategy}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+class AkkaUtilsSuite extends SparkFunSuite {
+
+  test("createStream") {
+    val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
+    try {
+      // tests the API, does not actually test data receiving
+      val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test")
+      val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
+      val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc,
+        Props[TestActor](),
+        "test",
+        StorageLevel.MEMORY_AND_DISK_SER_2,
+        supervisorStrategy = SupervisorStrategy.defaultStrategy)
+      val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+      val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+      val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
+        ssc,
+        Props[TestActor](),
+        "test",
+        StorageLevel.MEMORY_AND_DISK_SER_2,
+        () => null,
+        SupervisorStrategy.defaultStrategy)
+    } finally {
+      ssc.stop()
+    }
+  }
+}
+
+class TestActor extends ActorReceiver {
+  override def receive: Receive = {
+    case m: String => store(m)
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
index 9644890..f612e50 100644
--- a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
+++ b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
@@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
 import akka.util.ByteString
 import akka.zeromq._
 import akka.zeromq.Subscribe
+import com.typesafe.config.ConfigFactory
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.zeromq._
 
@@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
  *
  * To run this example locally, you may run publisher as
  *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ *      org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
  * and run the example as
  *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
+ *      org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
  */
 // scalastyle:on
 object ZeroMQWordCount {
@@ -90,7 +91,11 @@ object ZeroMQWordCount {
     def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
 
     // For this stream, a zeroMQ publisher should be running.
-    val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
+    val lines = ZeroMQUtils.createStream(
+      ssc,
+      url,
+      Subscribe(topic),
+      bytesToStringIterator _)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
     wordCounts.print()

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml
index a725988..7781aae 100644
--- a/streaming-zeromq/pom.xml
+++ b/streaming-zeromq/pom.xml
@@ -43,6 +43,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 506ba87..dd367cd 100644
--- a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -23,7 +23,7 @@ import akka.util.ByteString
 import akka.zeromq._
 
 import org.apache.spark.Logging
-import org.apache.spark.streaming.receiver.ActorReceiver
+import org.apache.spark.streaming.akka.ActorReceiver
 
 /**
  * A receiver to subscribe to ZeroMQ stream.

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 63cd8a2..1784d6e 100644
--- a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -20,29 +20,33 @@ package org.apache.spark.streaming.zeromq
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-import akka.actor.{Props, SupervisorStrategy}
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
 import akka.util.ByteString
 import akka.zeromq.Subscribe
 
-import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
 import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
 
 object ZeroMQUtils {
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param ssc            StreamingContext object
-   * @param publisherUrl   Url of remote zeromq publisher
-   * @param subscribe      Topic to subscribe to
+   * @param ssc StreamingContext object
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
    *                       and each frame has sequence of byte thus it needs the converter
    *                       (which might be deserializer of bytes) to translate from sequence
    *                       of sequence of bytes, where sequence refer to a frame
    *                       and sub sequence refer to its payload.
    * @param storageLevel   RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping (default:
+   *                           ActorReceiver.defaultActorSystemCreator)
+   * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
    */
   def createStream[T: ClassTag](
       ssc: StreamingContext,
@@ -50,22 +54,31 @@ object ZeroMQUtils {
       subscribe: Subscribe,
       bytesToObjects: Seq[ByteString] => Iterator[T],
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+      actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+      supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
     ): ReceiverInputDStream[T] = {
-    ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
-      "ZeroMQReceiver", storageLevel, supervisorStrategy)
+    AkkaUtils.createStream(
+      ssc,
+      Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+      "ZeroMQReceiver",
+      storageLevel,
+      actorSystemCreator,
+      supervisorStrategy)
   }
 
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param jssc           JavaStreamingContext object
-   * @param publisherUrl   Url of remote ZeroMQ publisher
-   * @param subscribe      Topic to subscribe to
+   * @param jssc JavaStreamingContext object
+   * @param publisherUrl Url of remote ZeroMQ publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
    *                       frame has sequence of byte thus it needs the converter(which might be
    *                       deserializer of bytes) to translate from sequence of sequence of bytes,
    *                       where sequence refer to a frame and sub sequence refer to its payload.
-   * @param storageLevel  Storage level to use for storing the received objects
+   * @param storageLevel Storage level to use for storing the received objects
+   * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+   *                           be shut down when the receiver is stopping.
+   * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
    */
   def createStream[T](
       jssc: JavaStreamingContext,
@@ -73,25 +86,33 @@ object ZeroMQUtils {
       subscribe: Subscribe,
       bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
       storageLevel: StorageLevel,
+      actorSystemCreator: JFunction0[ActorSystem],
       supervisorStrategy: SupervisorStrategy
     ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn =
       (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+    createStream[T](
+      jssc.ssc,
+      publisherUrl,
+      subscribe,
+      fn,
+      storageLevel,
+      () => actorSystemCreator.call(),
+      supervisorStrategy)
   }
 
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param jssc           JavaStreamingContext object
-   * @param publisherUrl   Url of remote zeromq publisher
-   * @param subscribe      Topic to subscribe to
+   * @param jssc JavaStreamingContext object
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
    *                       frame has sequence of byte thus it needs the converter(which might be
    *                       deserializer of bytes) to translate from sequence of sequence of bytes,
    *                       where sequence refer to a frame and sub sequence refer to its payload.
-   * @param storageLevel   RDD storage level.
+   * @param storageLevel RDD storage level.
    */
   def createStream[T](
       jssc: JavaStreamingContext,
@@ -104,14 +125,19 @@ object ZeroMQUtils {
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn =
       (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+    createStream[T](
+      jssc.ssc,
+      publisherUrl,
+      subscribe,
+      fn,
+      storageLevel)
   }
 
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param jssc           JavaStreamingContext object
-   * @param publisherUrl   Url of remote zeromq publisher
-   * @param subscribe      Topic to subscribe to
+   * @param jssc JavaStreamingContext object
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe Topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
    *                       frame has sequence of byte thus it needs the converter(which might
    *                       be deserializer of bytes) to translate from sequence of sequence of
@@ -128,6 +154,10 @@ object ZeroMQUtils {
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn =
       (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
-    createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+    createStream[T](
+      jssc.ssc,
+      publisherUrl,
+      subscribe,
+      fn)
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 417b91e..9ff4b41 100644
--- a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,14 +17,17 @@
 
 package org.apache.spark.streaming.zeromq;
 
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
+import akka.actor.ActorSystem;
 import akka.actor.SupervisorStrategy;
 import akka.util.ByteString;
 import akka.zeromq.Subscribe;
+import org.junit.Test;
+
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function0;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 
 public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
 
@@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
   public void testZeroMQStream() {
     String publishUrl = "abc";
     Subscribe subscribe = new Subscribe((ByteString)null);
-    Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
-      @Override
-      public Iterable<String> call(byte[][] bytes) throws Exception {
-        return null;
-      }
-    };
+    Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
+    Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
 
     JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
       ssc, publishUrl, subscribe, bytesToObjects);
     JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
       ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
     JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
-      ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
       SupervisorStrategy.defaultStrategy());
   }
 }
+
+class BytesToObjects implements Function<byte[][], Iterable<String>> {
+  @Override
+  public Iterable<String> call(byte[][] bytes) throws Exception {
+    return null;
+  }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+  @Override
+  public ActorSystem call() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 35d2e62..bac2679 100644
--- a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite {
 
     // tests the API, does not actually test data receiving
     val test1: ReceiverInputDStream[String] =
-      ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+      ZeroMQUtils.createStream(
+        ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
     val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
-      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
     val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
       ssc, publishUrl, subscribe, bytesToObjects,
-      StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+      StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
+    val test4: ReceiverInputDStream[String] =
+      ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+    val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+      ssc, publishUrl, subscribe, bytesToObjects,
+      StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
 
-    // TODO: Actually test data receiving
+    // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
     ssc.stop()
   }
 }