You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/06/10 15:24:06 UTC
[27/50] [abbrv] bahir git commit:
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes:
1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly
Author: Shixiong Zhu <sh...@databricks.com>
Closes #10744 from zsxwing/streaming-akka-2.
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e3591971
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e3591971
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e3591971
Branch: refs/heads/master
Commit: e3591971054e18618fd6ce400efac15411619285
Parents: 618b862
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Jan 20 13:55:41 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jan 20 13:55:41 2016 -0800
----------------------------------------------------------------------
.../streaming/akka/JavaActorWordCount.java | 14 +-
.../streaming/akka/ActorWordCount.scala | 37 +++--
streaming-akka/pom.xml | 73 +++++++++
.../spark/streaming/akka/ActorReceiver.scala | 64 +++++---
.../apache/spark/streaming/akka/AkkaUtils.scala | 147 +++++++++++++++++++
.../streaming/akka/JavaAkkaUtilsSuite.java | 66 +++++++++
.../spark/streaming/akka/AkkaUtilsSuite.scala | 64 ++++++++
.../streaming/zeromq/ZeroMQWordCount.scala | 13 +-
streaming-zeromq/pom.xml | 5 +
.../spark/streaming/zeromq/ZeroMQReceiver.scala | 2 +-
.../spark/streaming/zeromq/ZeroMQUtils.scala | 76 +++++++---
.../streaming/zeromq/JavaZeroMQStreamSuite.java | 31 ++--
.../streaming/zeromq/ZeroMQStreamSuite.scala | 16 +-
13 files changed, 526 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
index 2377207..62e5633 100644
--- a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
+++ b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
@@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.JavaActorReceiver;
+import org.apache.spark.streaming.akka.AkkaUtils;
+import org.apache.spark.streaming.akka.JavaActorReceiver;
/**
* A sample actor as receiver, is also simplest. This receiver actor
@@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
}
+ @Override
public void onReceive(Object msg) throws Exception {
store((T) msg);
}
@@ -100,18 +102,20 @@ public class JavaActorWordCount {
String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
/*
- * Following is the use of actorStream to plug in custom actor as receiver
+ * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* should be same.
*
- * For example: Both actorStream and JavaSampleActorReceiver are parameterized
+ * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
- JavaDStream<String> lines = jssc.actorStream(
- Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
+ JavaDStream<String> lines = AkkaUtils.createStream(
+ jssc,
+ Props.create(JavaSampleActorReceiver.class, feederActorURI),
+ "SampleReceiver");
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
index 88cdc6b..8e88987 100644
--- a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
+++ b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
@@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
import scala.util.Random
-import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
+import akka.actor._
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.receiver.ActorReceiver
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -78,8 +78,7 @@ class FeederActor extends Actor {
*
* @see [[org.apache.spark.examples.streaming.FeederActor]]
*/
-class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends ActorReceiver {
+class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
@@ -108,9 +107,13 @@ object FeederActor {
}
val Seq(host, port) = args.toSeq
- val conf = new SparkConf
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
- securityManager = new SecurityManager(conf))._1
+ val akkaConf = ConfigFactory.parseString(
+ s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |akka.remote.netty.tcp.hostname = "$host"
+ |akka.remote.netty.tcp.port = $port
+ |""".stripMargin)
+ val actorSystem = ActorSystem("test", akkaConf)
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
@@ -121,6 +124,7 @@ object FeederActor {
/**
* A sample word count program demonstrating the use of plugging in
+ *
* Actor as Receiver
* Usage: ActorWordCount <hostname> <port>
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
@@ -146,20 +150,21 @@ object ActorWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(2))
/*
- * Following is the use of actorStream to plug in custom actor as receiver
+ * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
+ * to ensure the type safety, i.e type of data received and InputDStream
* should be same.
*
- * For example: Both actorStream and SampleActorReceiver are parameterized
+ * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
-
- val lines = ssc.actorStream[String](
- Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
- host, port.toInt))), "SampleReceiver")
+ val lines = AkkaUtils.createStream[String](
+ ssc,
+ Props(classOf[SampleActorReceiver[String]],
+ "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
+ "SampleReceiver")
// compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml
new file mode 100644
index 0000000..34de9ba
--- /dev/null
+++ b/streaming-akka/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_2.10</artifactId>
+ <properties>
+ <sbt.project.name>streaming-akka</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Project External Akka</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-actor_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-remote_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index 0eabf3d..c75dc92 100644
--- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.receiver
+package org.apache.spark.streaming.akka
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
@@ -26,23 +26,44 @@ import scala.reflect.ClassTag
import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
/**
* :: DeveloperApi ::
* A helper with set of defaults for supervisor strategy
*/
@DeveloperApi
-object ActorSupervisorStrategy {
+object ActorReceiver {
- val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ /**
+ * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
+ * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
+ * others, it just escalates the failure to the supervisor of the supervisor.
+ */
+ val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException => Restart
case _: Exception => Escalate
}
+
+ /**
+ * A default ActorSystem creator. It will use a unique system name
+ * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
+ * communication.
+ */
+ val defaultActorSystemCreator: () => ActorSystem = () => {
+ val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
+ val akkaConf = ConfigFactory.parseString(
+ s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |""".stripMargin)
+ ActorSystem(uniqueSystemName, akkaConf)
+ }
}
/**
@@ -58,13 +79,12 @@ object ActorSupervisorStrategy {
* }
* }
*
- * // Can be used with an actorStream as follows
- * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
*
* }}}
*
* @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * to ensure the type safety, i.e. parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
@@ -103,18 +123,18 @@ abstract class ActorReceiver extends Actor {
*
* @example {{{
* class MyActor extends JavaActorReceiver {
- * def receive {
- * case anything: String => store(anything)
+ * @Override
+ * public void onReceive(Object msg) throws Exception {
+ * store((String) msg);
* }
* }
*
- * // Can be used with an actorStream as follows
- * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ * AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
*
* }}}
*
* @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * to ensure the type safety, i.e. parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
@@ -147,8 +167,8 @@ abstract class JavaActorReceiver extends UntypedActor {
/**
* :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
- * conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receiver.ActorReceiver]].
+ * conjunction with `AkkaUtils.createStream` and
+ * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
*/
@DeveloperApi
case class Statistics(numberOfMsgs: Int,
@@ -157,10 +177,10 @@ case class Statistics(numberOfMsgs: Int,
otherInfo: String)
/** Case class to receive data sent by child actors */
-private[streaming] sealed trait ActorReceiverData
-private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
-private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
-private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] sealed trait ActorReceiverData
+private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
+private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
/**
* Provides Actors as receivers for receiving stream.
@@ -181,14 +201,16 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
* context.parent ! Props(new Worker, "Worker")
* }}}
*/
-private[streaming] class ActorReceiverSupervisor[T: ClassTag](
+private[akka] class ActorReceiverSupervisor[T: ClassTag](
+ actorSystemCreator: () => ActorSystem,
props: Props,
name: String,
storageLevel: StorageLevel,
receiverSupervisorStrategy: SupervisorStrategy
) extends Receiver[T](storageLevel) with Logging {
- protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
+ private lazy val actorSystem = actorSystemCreator()
+ protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
class Supervisor extends Actor {
@@ -241,5 +263,7 @@ private[streaming] class ActorReceiverSupervisor[T: ClassTag](
def onStop(): Unit = {
actorSupervisor ! PoisonPill
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
new file mode 100644
index 0000000..38c35c5
--- /dev/null
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import scala.reflect.ClassTag
+
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
+
+import org.apache.spark.api.java.function.{Function0 => JFunction0}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object AkkaUtils {
+
+ /**
+ * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
+ *
+ * @param ssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping (default:
+ * ActorReceiver.defaultActorSystemCreator)
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+ supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
+ ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
+ val cleanF = ssc.sc.clean(actorSystemCreator)
+ ssc.receiverStream(new ActorReceiverSupervisor[T](
+ cleanF,
+ propsForActor,
+ actorName,
+ storageLevel,
+ supervisorStrategy))
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping.
+ * @param supervisorStrategy the supervisor strategy
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel,
+ actorSystemCreator: JFunction0[ActorSystem],
+ supervisorStrategy: SupervisorStrategy
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](
+ jssc.ssc,
+ propsForActor,
+ actorName,
+ storageLevel,
+ () => actorSystemCreator.call(),
+ supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. Storage level of the data will be the default
+ * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](jssc.ssc, propsForActor, actorName)
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
new file mode 100644
index 0000000..b732506
--- /dev/null
+++ b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.Function0;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+
+public class JavaAkkaUtilsSuite {
+
+ @Test // tests the API, does not actually test data receiving
+ public void testAkkaUtils() {
+ JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ try {
+ JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
+ jsc, Props.create(JavaTestActor.class), "test");
+ JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
+ jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
+ jsc,
+ Props.create(JavaTestActor.class),
+ "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
+ new ActorSystemCreatorForTest(),
+ SupervisorStrategy.defaultStrategy());
+ } finally {
+ jsc.stop();
+ }
+ }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+ @Override
+ public ActorSystem call() {
+ return null;
+ }
+}
+
+
+class JavaTestActor extends JavaActorReceiver {
+ @Override
+ public void onReceive(Object message) throws Exception {
+ store((String) message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
new file mode 100644
index 0000000..f437585
--- /dev/null
+++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import akka.actor.{Props, SupervisorStrategy}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+class AkkaUtilsSuite extends SparkFunSuite {
+
+ test("createStream") {
+ val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
+ try {
+ // tests the API, does not actually test data receiving
+ val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test")
+ val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc,
+ Props[TestActor](),
+ "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy = SupervisorStrategy.defaultStrategy)
+ val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+ val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+ val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc,
+ Props[TestActor](),
+ "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2,
+ () => null,
+ SupervisorStrategy.defaultStrategy)
+ } finally {
+ ssc.stop()
+ }
+ }
+}
+
+class TestActor extends ActorReceiver {
+ override def receive: Receive = {
+ case m: String => store(m)
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
index 9644890..f612e50 100644
--- a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
+++ b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
@@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
import akka.util.ByteString
import akka.zeromq._
import akka.zeromq.Subscribe
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.zeromq._
@@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
*
* To run this example locally, you may run publisher as
* `$ bin/run-example \
- * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
* and run the example as
* `$ bin/run-example \
- * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
+ * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
*/
// scalastyle:on
object ZeroMQWordCount {
@@ -90,7 +91,11 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
// For this stream, a zeroMQ publisher should be running.
- val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
+ val lines = ZeroMQUtils.createStream(
+ ssc,
+ url,
+ Subscribe(topic),
+ bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml
index a725988..7781aae 100644
--- a/streaming-zeromq/pom.xml
+++ b/streaming-zeromq/pom.xml
@@ -43,6 +43,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 506ba87..dd367cd 100644
--- a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -23,7 +23,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
-import org.apache.spark.streaming.receiver.ActorReceiver
+import org.apache.spark.streaming.akka.ActorReceiver
/**
* A receiver to subscribe to ZeroMQ stream.
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 63cd8a2..1784d6e 100644
--- a/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -20,29 +20,33 @@ package org.apache.spark.streaming.zeromq
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import akka.actor.{Props, SupervisorStrategy}
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
import akka.util.ByteString
import akka.zeromq.Subscribe
-import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
object ZeroMQUtils {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param ssc StreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param ssc StreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
* and each frame has sequence of byte thus it needs the converter
* (which might be deserializer of bytes) to translate from sequence
* of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping (default:
+ * ActorReceiver.defaultActorSystemCreator)
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
@@ -50,22 +54,31 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+ supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
): ReceiverInputDStream[T] = {
- ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ AkkaUtils.createStream(
+ ssc,
+ Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver",
+ storageLevel,
+ actorSystemCreator,
+ supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping.
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T](
jssc: JavaStreamingContext,
@@ -73,25 +86,33 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
+ actorSystemCreator: JFunction0[ActorSystem],
supervisorStrategy: SupervisorStrategy
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn,
+ storageLevel,
+ () => actorSystemCreator.call(),
+ supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
+ * @param storageLevel RDD storage level.
*/
def createStream[T](
jssc: JavaStreamingContext,
@@ -104,14 +125,19 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn,
+ storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might
* be deserializer of bytes) to translate from sequence of sequence of
@@ -128,6 +154,10 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn)
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 417b91e..9ff4b41 100644
--- a/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/streaming-zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,14 +17,17 @@
package org.apache.spark.streaming.zeromq;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
+import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
+import org.junit.Test;
+
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function0;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
public void testZeroMQStream() {
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
- Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
- @Override
- public Iterable<String> call(byte[][] bytes) throws Exception {
- return null;
- }
- };
+ Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
+ Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects);
JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
- ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
SupervisorStrategy.defaultStrategy());
}
}
+
+class BytesToObjects implements Function<byte[][], Iterable<String>> {
+ @Override
+ public Iterable<String> call(byte[][] bytes) throws Exception {
+ return null;
+ }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+ @Override
+ public ActorSystem call() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3591971/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 35d2e62..bac2679 100644
--- a/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/streaming-zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite {
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+ StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
+ val test4: ReceiverInputDStream[String] =
+ ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects,
+ StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
- // TODO: Actually test data receiving
+ // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
ssc.stop()
}
}