You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/08 07:22:10 UTC
[08/13] git commit: Changed JavaStreamingContextWith*** to
***Function in streaming.api.java.*** package. Also fixed packages of Flume
and MQTT tests.
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d0fd3b9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d0fd3b9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d0fd3b9a
Branch: refs/heads/master
Commit: d0fd3b9ad238294346eb3465c489eabd41fb2380
Parents: 977bcc3
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jan 6 01:47:53 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 6 01:47:53 2014 -0800
----------------------------------------------------------------------
.../streaming/examples/JavaFlumeEventCount.java | 6 +-
.../streaming/examples/JavaKafkaWordCount.java | 6 +-
.../api/java/flume/FlumeFunctions.scala | 48 ++++++++
.../flume/JavaStreamingContextWithFlume.scala | 48 --------
.../src/test/java/JavaFlumeStreamSuite.java | 38 -------
.../streaming/flume/JavaFlumeStreamSuite.java | 35 ++++++
.../api/java/kafka/KafkaFunctions.scala | 107 ++++++++++++++++++
.../kafka/JavaStreamingContextWithKafka.scala | 107 ------------------
.../streaming/kafka/JavaKafkaStreamSuite.java | 15 ++-
.../mqtt/JavaStreamingContextWithMQTT.scala | 59 ----------
.../spark/streaming/mqtt/MQTTFunctions.scala | 43 --------
.../spark/streaming/mqtt/MQTTInputDStream.scala | 110 -------------------
.../spark/spark/streaming/mqtt/package.scala | 24 ----
.../streaming/api/java/mqtt/MQTTFunctions.scala | 59 ++++++++++
.../spark/streaming/mqtt/MQTTFunctions.scala | 43 ++++++++
.../spark/streaming/mqtt/MQTTInputDStream.scala | 110 +++++++++++++++++++
.../apache/spark/streaming/mqtt/package.scala | 24 ++++
.../streaming/mqtt/JavaMQTTStreamSuite.java | 10 +-
.../api/java/twitter/TwitterFunctions.scala | 99 +++++++++++++++++
.../JavaStreamingContextWithTwitter.scala | 99 -----------------
.../twitter/JavaTwitterStreamSuite.java | 19 ++--
.../api/java/zeromq/ZeroMQFunctions.scala | 102 +++++++++++++++++
.../zeromq/JavaStreamingContextWithZeroMQ.scala | 102 -----------------
.../streaming/zeromq/JavaZeroMQStreamSuite.java | 12 +-
24 files changed, 658 insertions(+), 667 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index e53c4f9..64832a9 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
+import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
@@ -52,8 +52,8 @@ public class JavaFlumeEventCount {
JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
- JavaDStream<SparkFlumeEvent> flumeStream = sscWithFlume.flumeStream("localhost", port);
+ FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
+ JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port);
flumeStream.count();
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index de0420c..207ce8c 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka;
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import scala.Tuple2;
/**
@@ -64,8 +64,8 @@ public class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
- JavaPairDStream<String, String> messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap);
+ KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
+ JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
new file mode 100644
index 0000000..3347d19
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.flume
+
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.flume._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Flume input streams.
+ */
+class FlumeFunctions(javaStreamingContext: JavaStreamingContext) {
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
+ javaStreamingContext.ssc.flumeStream(hostname, port)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
+ JavaDStream[SparkFlumeEvent] = {
+ javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala
deleted file mode 100644
index 4e66ae3..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.storage.StorageLevel
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Flume input streams.
- */
-class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- */
- def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port)
- }
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
- JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port, storageLevel)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/test/java/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/JavaFlumeStreamSuite.java
deleted file mode 100644
index deffc78..0000000
--- a/external/flume/src/test/java/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
-
- // tests the API, does not actually test data receiving
- JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345);
- JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000..5930fee
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,35 @@
+package org.apache.spark.streaming.flume;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345);
+ JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
new file mode 100644
index 0000000..491331b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.kafka
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import java.lang.{Integer => JInt}
+import java.util.{Map => JMap}
+
+import kafka.serializer.Decoder
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.kafka._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Kafka input streams.
+ */
+class KafkaFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt]
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ *
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param keyTypeClass Key type of RDD
+ * @param valueTypeClass value type of RDD
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level. Defaults to memory-only
+ */
+ def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val valueCmt: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+
+ implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+ implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+ javaStreamingContext.ssc.kafkaStream[K, V, U, T](
+ kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
deleted file mode 100644
index ab0e8a6..0000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-import java.lang.{Integer => JInt}
-import java.util.{Map => JMap}
-
-import kafka.serializer.Decoder
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Kafka input streams.
- */
-class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt]
- ): JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- *
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
- * @param keyDecoderClass Type of kafka key decoder
- * @param valueDecoderClass Type of kafka value decoder
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- */
- def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
- keyTypeClass: Class[K],
- valueTypeClass: Class[V],
- keyDecoderClass: Class[U],
- valueDecoderClass: Class[T],
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
- ssc.kafkaStream[K, V, U, T](
- kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 66236df..fdea96e 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
@@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
+ KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream(
+ JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
deleted file mode 100644
index d814da0..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating MQTT input streams.
- */
-class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
-
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
- */
- def mqttStream(
- brokerUrl: String,
- topic: String
- ): JavaDStream[String] = {
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.mqttStream(brokerUrl, topic)
- }
-
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
- def mqttStream(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): JavaDStream[String] = {
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.mqttStream(brokerUrl, topic, storageLevel)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
deleted file mode 100644
index 86f4e9c..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
- */
-class MQTTFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- */
- def mqttStream(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
- val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
deleted file mode 100644
index c8987a3..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-
-/**
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-
-private[streaming]
-class MQTTInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
- }
-}
-
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
- def onStop() {
- blockGenerator.stop()
- }
-
- def onStart() {
-
- blockGenerator.start()
-
- // Set up persistence for messages
- var peristance: MqttClientPersistence = new MemoryPersistence()
-
- // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
-
- // Connect to MqttBroker
- client.connect()
-
- // Subscribe to Mqtt topic
- client.subscribe(topic)
-
- // Callback automatically triggers as and when new message arrives on specified topic
- var callback: MqttCallback = new MqttCallback() {
-
- // Handles Mqtt message
- override def messageArrived(arg0: String, arg1: MqttMessage) {
- blockGenerator += new String(arg1.getPayload())
- }
-
- override def deliveryComplete(arg0: IMqttDeliveryToken) {
- }
-
- override def connectionLost(arg0: Throwable) {
- logInfo("Connection lost " + arg0)
- }
- }
-
- // Set up callback for MqttClient
- client.setCallback(callback)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
deleted file mode 100644
index 28a944f..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object mqtt {
- implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000..7212495
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.mqtt
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.mqtt._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating MQTT input streams.
+ */
+class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000..86f4e9c
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
+ */
+class MQTTFunctions(ssc: StreamingContext) {
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
new file mode 100644
index 0000000..c8987a3
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import java.util.Properties
+import java.util.concurrent.Executors
+import java.io.IOException
+
+import org.eclipse.paho.client.mqttv3.MqttCallback
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+
+private[streaming]
+class MQTTInputDStream[T: ClassTag](
+ @transient ssc_ : StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_) with Logging {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+private[streaming]
+class MQTTReceiver(brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
+ def onStop() {
+ blockGenerator.stop()
+ }
+
+ def onStart() {
+
+ blockGenerator.start()
+
+ // Set up persistence for messages
+ var peristance: MqttClientPersistence = new MemoryPersistence()
+
+ // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
+
+ // Connect to MqttBroker
+ client.connect()
+
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
+ // Callback automatically triggers as and when new message arrives on specified topic
+ var callback: MqttCallback = new MqttCallback() {
+
+ // Handles Mqtt message
+ override def messageArrived(arg0: String, arg1: MqttMessage) {
+ blockGenerator += new String(arg1.getPayload())
+ }
+
+ override def deliveryComplete(arg0: IMqttDeliveryToken) {
+ }
+
+ override def connectionLost(arg0: Throwable) {
+ logInfo("Connection lost " + arg0)
+ }
+ }
+
+ // Set up callback for MqttClient
+ client.setCallback(callback)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
new file mode 100644
index 0000000..28a944f
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object mqtt {
+ implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index c1f4164..3ddb4d0 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -28,14 +29,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
- JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc);
+ MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic);
- JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic,
+ JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
+ JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
new file mode 100644
index 0000000..22e297a
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.twitter._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Twitter input streams.
+ */
+class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ */
+ def twitterStream(): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(None)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(filters: Array[String]): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(None, filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(None, filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ */
+ def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth))
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization object
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
deleted file mode 100644
index 0250364..0000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j.Status
-import twitter4j.auth.Authorization
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Twitter input streams.
- */
-class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- */
- def twitterStream(): JavaDStream[Status] = {
- ssc.twitterStream(None)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(filters: Array[String]): JavaDStream[Status] = {
- ssc.twitterStream(None, filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
- ssc.twitterStream(None, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- */
- def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth))
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String]
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 34e4fbd..4564d6c 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.twitter;
import java.util.Arrays;
+
+import org.apache.spark.streaming.api.java.twitter.TwitterFunctions;
import org.junit.Test;
import twitter4j.Status;
@@ -31,21 +33,18 @@ import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
- JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc);
+ TwitterFunctions twitterFunc = new TwitterFunctions(ssc);
String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving
- JavaDStream<Status> test1 = sscWithTwitter.twitterStream();
- JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters);
+ JavaDStream<Status> test1 = twitterFunc.twitterStream();
+ JavaDStream<Status> test2 = twitterFunc.twitterStream(filters);
JavaDStream<Status> test3 =
- sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth);
- JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters);
+ twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = twitterFunc.twitterStream(auth);
+ JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters);
JavaDStream<Status> test6 =
- sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999);
+ twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
new file mode 100644
index 0000000..a9bbce7
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.zeromq._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating ZeroMQ input streams.
+ */
+class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level.
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
deleted file mode 100644
index dc5d1f0..0000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-import akka.actor.SupervisorStrategy
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating ZeroMQ input streams.
- */
-class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def zeroMQStream[T](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel,
- supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
- */
- def zeroMQStream[T](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- */
- def zeroMQStream[T](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 96af7d7..b020ae4 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.zeromq;
+import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
import org.junit.Test;
import akka.actor.SupervisorStrategy;
@@ -32,7 +33,7 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@Test // tests the API, does not actually test data receiving
public void testZeroMQStream() {
- JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc);
+ ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -42,14 +43,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999);
}
}