You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/08 07:22:10 UTC

[08/13] git commit: Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.

Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d0fd3b9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d0fd3b9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d0fd3b9a

Branch: refs/heads/master
Commit: d0fd3b9ad238294346eb3465c489eabd41fb2380
Parents: 977bcc3
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jan 6 01:47:53 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 6 01:47:53 2014 -0800

----------------------------------------------------------------------
 .../streaming/examples/JavaFlumeEventCount.java |   6 +-
 .../streaming/examples/JavaKafkaWordCount.java  |   6 +-
 .../api/java/flume/FlumeFunctions.scala         |  48 ++++++++
 .../flume/JavaStreamingContextWithFlume.scala   |  48 --------
 .../src/test/java/JavaFlumeStreamSuite.java     |  38 -------
 .../streaming/flume/JavaFlumeStreamSuite.java   |  35 ++++++
 .../api/java/kafka/KafkaFunctions.scala         | 107 ++++++++++++++++++
 .../kafka/JavaStreamingContextWithKafka.scala   | 107 ------------------
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  15 ++-
 .../mqtt/JavaStreamingContextWithMQTT.scala     |  59 ----------
 .../spark/streaming/mqtt/MQTTFunctions.scala    |  43 --------
 .../spark/streaming/mqtt/MQTTInputDStream.scala | 110 -------------------
 .../spark/spark/streaming/mqtt/package.scala    |  24 ----
 .../streaming/api/java/mqtt/MQTTFunctions.scala |  59 ++++++++++
 .../spark/streaming/mqtt/MQTTFunctions.scala    |  43 ++++++++
 .../spark/streaming/mqtt/MQTTInputDStream.scala | 110 +++++++++++++++++++
 .../apache/spark/streaming/mqtt/package.scala   |  24 ++++
 .../streaming/mqtt/JavaMQTTStreamSuite.java     |  10 +-
 .../api/java/twitter/TwitterFunctions.scala     |  99 +++++++++++++++++
 .../JavaStreamingContextWithTwitter.scala       |  99 -----------------
 .../twitter/JavaTwitterStreamSuite.java         |  19 ++--
 .../api/java/zeromq/ZeroMQFunctions.scala       | 102 +++++++++++++++++
 .../zeromq/JavaStreamingContextWithZeroMQ.scala | 102 -----------------
 .../streaming/zeromq/JavaZeroMQStreamSuite.java |  12 +-
 24 files changed, 658 insertions(+), 667 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index e53c4f9..64832a9 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.*;
 import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
+import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
 import org.apache.spark.streaming.flume.SparkFlumeEvent;
 
 /**
@@ -52,8 +52,8 @@ public class JavaFlumeEventCount {
 
     JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
             System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-    JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
-    JavaDStream<SparkFlumeEvent> flumeStream = sscWithFlume.flumeStream("localhost", port);
+    FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
+    JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port);
 
     flumeStream.count();
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index de0420c..207ce8c 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka;
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
 import scala.Tuple2;
 
 /**
@@ -64,8 +64,8 @@ public class JavaKafkaWordCount {
       topicMap.put(topic, numThreads);
     }
 
-    JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
-    JavaPairDStream<String, String> messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap);
+    KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
+    JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap);
 
     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
new file mode 100644
index 0000000..3347d19
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.flume
+
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.flume._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Flume input streams.
+ */
+class FlumeFunctions(javaStreamingContext: JavaStreamingContext) {
+  /**
+   * Creates a input stream from a Flume source.
+   * @param hostname Hostname of the slave machine to which the flume data will be sent
+   * @param port     Port of the slave machine to which the flume data will be sent
+   */
+  def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
+    javaStreamingContext.ssc.flumeStream(hostname, port)
+  }
+
+  /**
+   * Creates a input stream from a Flume source.
+   * @param hostname Hostname of the slave machine to which the flume data will be sent
+   * @param port     Port of the slave machine to which the flume data will be sent
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
+    JavaDStream[SparkFlumeEvent] = {
+    javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala
deleted file mode 100644
index 4e66ae3..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.storage.StorageLevel
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Flume input streams.
- */
-class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext)
-  extends JavaStreamingContext(javaStreamingContext.ssc) {
-  /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   */
-  def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
-    ssc.flumeStream(hostname, port)
-  }
-
-  /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
-    JavaDStream[SparkFlumeEvent] = {
-    ssc.flumeStream(hostname, port, storageLevel)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/test/java/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/JavaFlumeStreamSuite.java
deleted file mode 100644
index deffc78..0000000
--- a/external/flume/src/test/java/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
-  @Test
-  public void testFlumeStream() {
-    JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
-
-    // tests the API, does not actually test data receiving
-    JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345);
-    JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345,
-      StorageLevel.MEMORY_AND_DISK_SER_2());
-
-    // To verify that JavaStreamingContextWithKafka is also StreamingContext
-    JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000..5930fee
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,35 @@
+package org.apache.spark.streaming.flume;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+  @Test
+  public void testFlumeStream() {
+    FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
+
+    // tests the API, does not actually test data receiving
+    JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345);
+    JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345,
+      StorageLevel.MEMORY_AND_DISK_SER_2());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
new file mode 100644
index 0000000..491331b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.kafka
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import java.lang.{Integer => JInt}
+import java.util.{Map => JMap}
+
+import kafka.serializer.Decoder
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.kafka._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Kafka input streams.
+ */
+class KafkaFunctions(javaStreamingContext: JavaStreamingContext) {
+
+  /**
+   * Create an input stream that pulls messages form a Kafka Broker.
+   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+   * @param groupId The group id for this consumer.
+   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+   * in its own thread.
+   */
+  def kafkaStream(
+      zkQuorum: String,
+      groupId: String,
+      topics: JMap[String, JInt]
+    ): JavaPairDStream[String, String] = {
+      implicit val cmt: ClassTag[String] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+      javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /**
+   * Create an input stream that pulls messages form a Kafka Broker.
+   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+   * @param groupId The group id for this consumer.
+   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+   *               in its own thread.
+   * @param storageLevel RDD storage level.
+   *
+   */
+  def kafkaStream(
+      zkQuorum: String,
+      groupId: String,
+      topics: JMap[String, JInt],
+      storageLevel: StorageLevel
+    ): JavaPairDStream[String, String] = {
+    implicit val cmt: ClassTag[String] =
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+  }
+
+  /**
+   * Create an input stream that pulls messages form a Kafka Broker.
+   * @param keyTypeClass Key type of RDD
+   * @param valueTypeClass value type of RDD
+   * @param keyDecoderClass Type of kafka key decoder
+   * @param valueDecoderClass Type of kafka value decoder
+   * @param kafkaParams Map of kafka configuration paramaters.
+   *                    See: http://kafka.apache.org/configuration.html
+   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+   * in its own thread.
+   * @param storageLevel RDD storage level. Defaults to memory-only
+   */
+  def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+      keyTypeClass: Class[K],
+      valueTypeClass: Class[V],
+      keyDecoderClass: Class[U],
+      valueDecoderClass: Class[T],
+      kafkaParams: JMap[String, String],
+      topics: JMap[String, JInt],
+      storageLevel: StorageLevel
+    ): JavaPairDStream[K, V] = {
+    implicit val keyCmt: ClassTag[K] =
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+    implicit val valueCmt: ClassTag[V] =
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+
+    implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+    implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+    javaStreamingContext.ssc.kafkaStream[K, V, U, T](
+      kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
deleted file mode 100644
index ab0e8a6..0000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-import java.lang.{Integer => JInt}
-import java.util.{Map => JMap}
-
-import kafka.serializer.Decoder
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Kafka input streams.
- */
-class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
-  extends JavaStreamingContext(javaStreamingContext.ssc) {
-
-  /**
-   * Create an input stream that pulls messages form a Kafka Broker.
-   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
-   * @param groupId The group id for this consumer.
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   * in its own thread.
-   */
-  def kafkaStream(
-      zkQuorum: String,
-      groupId: String,
-      topics: JMap[String, JInt]
-    ): JavaPairDStream[String, String] = {
-      implicit val cmt: ClassTag[String] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-      ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
-  }
-
-  /**
-   * Create an input stream that pulls messages form a Kafka Broker.
-   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
-   * @param groupId The group id for this consumer.
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   *               in its own thread.
-   * @param storageLevel RDD storage level.
-   *
-   */
-  def kafkaStream(
-      zkQuorum: String,
-      groupId: String,
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel
-    ): JavaPairDStream[String, String] = {
-    implicit val cmt: ClassTag[String] =
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages form a Kafka Broker.
-   * @param keyTypeClass Key type of RDD
-   * @param valueTypeClass value type of RDD
-   * @param keyDecoderClass Type of kafka key decoder
-   * @param valueDecoderClass Type of kafka value decoder
-   * @param kafkaParams Map of kafka configuration paramaters.
-   *                    See: http://kafka.apache.org/configuration.html
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   * in its own thread.
-   * @param storageLevel RDD storage level. Defaults to memory-only
-   */
-  def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
-      keyTypeClass: Class[K],
-      valueTypeClass: Class[V],
-      keyDecoderClass: Class[U],
-      valueDecoderClass: Class[T],
-      kafkaParams: JMap[String, String],
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel
-    ): JavaPairDStream[K, V] = {
-    implicit val keyCmt: ClassTag[K] =
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val valueCmt: ClassTag[V] =
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
-    implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
-    implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
-    ssc.kafkaStream[K, V, U, T](
-      kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 66236df..fdea96e 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,6 +18,8 @@
 package org.apache.spark.streaming.kafka;
 
 import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
 import org.junit.Test;
 import com.google.common.collect.Maps;
 import kafka.serializer.StringDecoder;
@@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
   public void testKafkaStream() {
 
     HashMap<String, Integer> topics = Maps.newHashMap();
-    JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
+    KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
 
     // tests the API, does not actually test data receiving
-    JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics);
-    JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics,
+    JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
+    JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
       StorageLevel.MEMORY_AND_DISK_SER_2());
 
     HashMap<String, String> kafkaParams = Maps.newHashMap();
-    kafkaParams.put("zookeeper.connect","localhost:12345");
+    kafkaParams.put("zookeeper.connect", "localhost:12345");
     kafkaParams.put("group.id","consumer-group");
-    JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream(
+    JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
       String.class, String.class, StringDecoder.class, StringDecoder.class,
       kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
-
-    // To verify that JavaStreamingContextWithKafka is also StreamingContext
-    JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
deleted file mode 100644
index d814da0..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating MQTT input streams.
- */
-class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
-  extends JavaStreamingContext(javaStreamingContext.ssc) {
-
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param brokerUrl Url of remote MQTT publisher
-   * @param topic topic name to subscribe to
-   */
-  def mqttStream(
-      brokerUrl: String,
-      topic: String
-    ): JavaDStream[String] = {
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    ssc.mqttStream(brokerUrl, topic)
-  }
-
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param brokerUrl Url of remote MQTT publisher
-   * @param topic topic name to subscribe to
-   * @param storageLevel RDD storage level.
-   */
-  def mqttStream(
-      brokerUrl: String,
-      topic: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): JavaDStream[String] = {
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    ssc.mqttStream(brokerUrl, topic, storageLevel)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
deleted file mode 100644
index 86f4e9c..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
- */
-class MQTTFunctions(ssc: StreamingContext) {
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param brokerUrl Url of remote MQTT publisher
-   * @param topic topic name to subscribe to
-   * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
-   */
-  def mqttStream(
-      brokerUrl: String,
-      topic: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[String] = {
-    val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
-    ssc.registerInputStream(inputStream)
-    inputStream
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
deleted file mode 100644
index c8987a3..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-
-/**
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-
-private[streaming] 
-class MQTTInputDStream[T: ClassTag](
-    @transient ssc_ : StreamingContext,
-    brokerUrl: String,
-    topic: String,
-    storageLevel: StorageLevel
-  ) extends NetworkInputDStream[T](ssc_) with Logging {
-  
-  def getReceiver(): NetworkReceiver[T] = {
-    new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
-  }
-}
-
-private[streaming] 
-class MQTTReceiver(brokerUrl: String,
-  topic: String,
-  storageLevel: StorageLevel
-  ) extends NetworkReceiver[Any] {
-  lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-  
-  def onStop() {
-    blockGenerator.stop()
-  }
-  
-  def onStart() {
-
-    blockGenerator.start()
-
-    // Set up persistence for messages 
-    var peristance: MqttClientPersistence = new MemoryPersistence()
-
-    // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
-    var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
-
-    // Connect to MqttBroker    
-    client.connect()
-
-    // Subscribe to Mqtt topic
-    client.subscribe(topic)
-
-    // Callback automatically triggers as and when new message arrives on specified topic
-    var callback: MqttCallback = new MqttCallback() {
-
-      // Handles Mqtt message 
-      override def messageArrived(arg0: String, arg1: MqttMessage) {
-        blockGenerator += new String(arg1.getPayload())
-      }
-
-      override def deliveryComplete(arg0: IMqttDeliveryToken) {
-      }
-
-      override def connectionLost(arg0: Throwable) {
-        logInfo("Connection lost " + arg0)
-      }
-    }
-
-    // Set up callback for MqttClient
-    client.setCallback(callback)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
deleted file mode 100644
index 28a944f..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object mqtt {
-  implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000..7212495
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.mqtt
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.mqtt._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating MQTT input streams.
+ */
+class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param brokerUrl Url of remote MQTT publisher
+   * @param topic topic name to subscribe to
+   */
+  def mqttStream(
+      brokerUrl: String,
+      topic: String
+    ): JavaDStream[String] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param brokerUrl Url of remote MQTT publisher
+   * @param topic topic name to subscribe to
+   * @param storageLevel RDD storage level.
+   */
+  def mqttStream(
+      brokerUrl: String,
+      topic: String,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): JavaDStream[String] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000..86f4e9c
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
+ */
+class MQTTFunctions(ssc: StreamingContext) {
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param brokerUrl Url of remote MQTT publisher
+   * @param topic topic name to subscribe to
+   * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+   */
+  def mqttStream(
+      brokerUrl: String,
+      topic: String,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): DStream[String] = {
+    val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+    ssc.registerInputStream(inputStream)
+    inputStream
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
new file mode 100644
index 0000000..c8987a3
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import java.util.Properties
+import java.util.concurrent.Executors
+import java.io.IOException
+
+import org.eclipse.paho.client.mqttv3.MqttCallback
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+
+private[streaming] 
+class MQTTInputDStream[T: ClassTag](
+    @transient ssc_ : StreamingContext,
+    brokerUrl: String,
+    topic: String,
+    storageLevel: StorageLevel
+  ) extends NetworkInputDStream[T](ssc_) with Logging {
+  
+  def getReceiver(): NetworkReceiver[T] = {
+    new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+  }
+}
+
+private[streaming] 
+class MQTTReceiver(brokerUrl: String,
+  topic: String,
+  storageLevel: StorageLevel
+  ) extends NetworkReceiver[Any] {
+  lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+  
+  def onStop() {
+    blockGenerator.stop()
+  }
+  
+  def onStart() {
+
+    blockGenerator.start()
+
+    // Set up persistence for messages 
+    var peristance: MqttClientPersistence = new MemoryPersistence()
+
+    // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+    var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
+
+    // Connect to MqttBroker    
+    client.connect()
+
+    // Subscribe to Mqtt topic
+    client.subscribe(topic)
+
+    // Callback automatically triggers as and when new message arrives on specified topic
+    var callback: MqttCallback = new MqttCallback() {
+
+      // Handles Mqtt message 
+      override def messageArrived(arg0: String, arg1: MqttMessage) {
+        blockGenerator += new String(arg1.getPayload())
+      }
+
+      override def deliveryComplete(arg0: IMqttDeliveryToken) {
+      }
+
+      override def connectionLost(arg0: Throwable) {
+        logInfo("Connection lost " + arg0)
+      }
+    }
+
+    // Set up callback for MqttClient
+    client.setCallback(callback)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
new file mode 100644
index 0000000..28a944f
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object mqtt {
+  implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index c1f4164..3ddb4d0 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt;
 
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
 import org.junit.Test;
 
 import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -28,14 +29,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
   public void testMQTTStream() {
     String brokerUrl = "abc";
     String topic = "def";
-    JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc);
+    MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
 
     // tests the API, does not actually test data receiving
-    JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic);
-    JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic,
+    JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
+    JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
       StorageLevel.MEMORY_AND_DISK_SER_2());
-
-    // To verify that JavaStreamingContextWithKafka is also StreamingContext
-    JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
new file mode 100644
index 0000000..22e297a
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.twitter._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Twitter input streams.
+ */
+class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
+
+  /**
+   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+   * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+   * twitter4j.oauth.accessTokenSecret.
+   */
+  def twitterStream(): JavaDStream[Status] = {
+    javaStreamingContext.ssc.twitterStream(None)
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+   * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+   * twitter4j.oauth.accessTokenSecret.
+   * @param filters Set of filter strings to get only those tweets that match them
+   */
+  def twitterStream(filters: Array[String]): JavaDStream[Status] = {
+    javaStreamingContext.ssc.twitterStream(None, filters)
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+   * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+   * twitter4j.oauth.accessTokenSecret.
+   * @param filters Set of filter strings to get only those tweets that match them
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
+    javaStreamingContext.ssc.twitterStream(None, filters, storageLevel)
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param twitterAuth Twitter4J Authorization
+   */
+  def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
+    javaStreamingContext.ssc.twitterStream(Some(twitterAuth))
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param twitterAuth Twitter4J Authorization
+   * @param filters Set of filter strings to get only those tweets that match them
+   */
+  def twitterStream(
+      twitterAuth: Authorization,
+      filters: Array[String]
+    ): JavaDStream[Status] = {
+    javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters)
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param twitterAuth Twitter4J Authorization object
+   * @param filters Set of filter strings to get only those tweets that match them
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def twitterStream(
+      twitterAuth: Authorization,
+      filters: Array[String],
+      storageLevel: StorageLevel
+    ): JavaDStream[Status] = {
+    javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
deleted file mode 100644
index 0250364..0000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j.Status
-import twitter4j.auth.Authorization
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Twitter input streams.
- */
-class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext)
-  extends JavaStreamingContext(javaStreamingContext.ssc) {
-
-  /**
-   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
-   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
-   * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
-   * twitter4j.oauth.accessTokenSecret.
-   */
-  def twitterStream(): JavaDStream[Status] = {
-    ssc.twitterStream(None)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
-   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
-   * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
-   * twitter4j.oauth.accessTokenSecret.
-   * @param filters Set of filter strings to get only those tweets that match them
-   */
-  def twitterStream(filters: Array[String]): JavaDStream[Status] = {
-    ssc.twitterStream(None, filters)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
-   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
-   * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
-   * twitter4j.oauth.accessTokenSecret.
-   * @param filters Set of filter strings to get only those tweets that match them
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
-    ssc.twitterStream(None, filters, storageLevel)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter.
-   * @param twitterAuth Twitter4J Authorization
-   */
-  def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
-    ssc.twitterStream(Some(twitterAuth))
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter.
-   * @param twitterAuth Twitter4J Authorization
-   * @param filters Set of filter strings to get only those tweets that match them
-   */
-  def twitterStream(
-      twitterAuth: Authorization,
-      filters: Array[String]
-    ): JavaDStream[Status] = {
-    ssc.twitterStream(Some(twitterAuth), filters)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter.
-   * @param twitterAuth Twitter4J Authorization object
-   * @param filters Set of filter strings to get only those tweets that match them
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def twitterStream(
-      twitterAuth: Authorization,
-      filters: Array[String],
-      storageLevel: StorageLevel
-    ): JavaDStream[Status] = {
-    ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 34e4fbd..4564d6c 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -18,6 +18,8 @@
 package org.apache.spark.streaming.twitter;
 
 import java.util.Arrays;
+
+import org.apache.spark.streaming.api.java.twitter.TwitterFunctions;
 import org.junit.Test;
 
 import twitter4j.Status;
@@ -31,21 +33,18 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
   @Test
   public void testTwitterStream() {
-    JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc);
+    TwitterFunctions twitterFunc = new TwitterFunctions(ssc);
     String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
     Authorization auth = NullAuthorization.getInstance();
 
     // tests the API, does not actually test data receiving
-    JavaDStream<Status> test1 = sscWithTwitter.twitterStream();
-    JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters);
+    JavaDStream<Status> test1 = twitterFunc.twitterStream();
+    JavaDStream<Status> test2 = twitterFunc.twitterStream(filters);
     JavaDStream<Status> test3 =
-      sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth);
-    JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters);
+      twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaDStream<Status> test4 = twitterFunc.twitterStream(auth);
+    JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters);
     JavaDStream<Status> test6 =
-      sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
-
-    // To verify that JavaStreamingContextWithKafka is also StreamingContext
-    JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999);
+      twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
new file mode 100644
index 0000000..a9bbce7
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.zeromq._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating ZeroMQ input streams.
+ */
+class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote ZeroMQ publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def zeroMQStream[T](
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      storageLevel: StorageLevel,
+      supervisorStrategy: SupervisorStrategy
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel RDD storage level.
+   */
+  def zeroMQStream[T](
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      storageLevel: StorageLevel
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   */
+  def zeroMQStream[T](
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
deleted file mode 100644
index dc5d1f0..0000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-import akka.actor.SupervisorStrategy
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating ZeroMQ input streams.
- */
-class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
-  extends JavaStreamingContext(javaStreamingContext.ssc) {
-
-  /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote ZeroMQ publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def zeroMQStream[T](
-      publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
-      storageLevel: StorageLevel,
-      supervisorStrategy: SupervisorStrategy
-    ): JavaDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
-    ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
-  }
-
-  /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   * @param storageLevel RDD storage level.
-   */
-  def zeroMQStream[T](
-      publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
-      storageLevel: StorageLevel
-    ): JavaDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
-    ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
-  }
-
-  /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   */
-  def zeroMQStream[T](
-      publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
-    ): JavaDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
-    ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0fd3b9a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 96af7d7..b020ae4 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming.zeromq;
 
+import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
 import org.junit.Test;
 
 import akka.actor.SupervisorStrategy;
@@ -32,7 +33,7 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
 
   @Test // tests the API, does not actually test data receiving
   public void testZeroMQStream() {
-    JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc);
+    ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
     String publishUrl = "abc";
     Subscribe subscribe = new Subscribe((ByteString)null);
     Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -42,14 +43,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
       }
     };
 
-    JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream(
+    JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
       publishUrl, subscribe, bytesToObjects);
-    JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream(
+    JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
       publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream(
+    JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
       publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
-
-    // To verify that JavaStreamingContextWithKafka is also StreamingContext
-    JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999);
   }
 }