You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/08 07:22:04 UTC

[02/13] Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
new file mode 100644
index 0000000..d7f6d35
--- /dev/null
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+import twitter4j.auth.{NullAuthorization, Authorization}
+
+class TwitterStreamSuite extends TestSuiteBase {
+
+  test("kafka input stream") {
+    val ssc = new StreamingContext(master, framework, batchDuration)
+    val filters = Seq("filter1", "filter2")
+    val authorization: Authorization = NullAuthorization.getInstance()
+
+    // tests the API, does not actually test data receiving
+    val test1 = ssc.twitterStream(None)
+    val test2 = ssc.twitterStream(None, filters)
+    val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test4 = ssc.twitterStream(Some(authorization))
+    val test5 = ssc.twitterStream(Some(authorization), filters)
+    val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+    // Note that actually testing the data receiving is hard as authentication keys are
+    // necessary for accessing Twitter live stream
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
new file mode 100644
index 0000000..dc5d1f0
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating ZeroMQ input streams.
+ */
+class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
+  extends JavaStreamingContext(javaStreamingContext.ssc) {
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote ZeroMQ publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def zeroMQStream[T](
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      storageLevel: StorageLevel,
+      supervisorStrategy: SupervisorStrategy
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel RDD storage level.
+   */
+  def zeroMQStream[T](
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      storageLevel: StorageLevel
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   */
+  def zeroMQStream[T](
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
new file mode 100644
index 0000000..f4c75ab
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.receivers._
+
+/**
+ * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
+ */
+class ZeroMQFunctions(ssc: StreamingContext) {
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+   *                       and each frame has sequence of byte thus it needs the converter
+   *                       (which might be deserializer of bytes) to translate from sequence
+   *                       of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel RDD storage level. Defaults to memory-only.
+   */
+  def zeroMQStream[T: ClassTag](
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+    ): DStream[T] = {
+    ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+        "ZeroMQReceiver", storageLevel, supervisorStrategy)
+  }
+}
+  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
new file mode 100644
index 0000000..769761e
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+
+import akka.actor.Actor
+import akka.util.ByteString
+import akka.zeromq._
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receivers._
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
+  subscribe: Subscribe,
+  bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
+  extends Actor with Receiver with Logging {
+
+  override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
+    Connect(publisherUrl), subscribe)
+
+  def receive: Receive = {
+
+    case Connecting ⇒ logInfo("connecting ...")
+
+    case m: ZMQMessage ⇒
+      logDebug("Received message for:" + m.frame(0))
+
+      //We ignore first frame for processing as it is the topic
+      val bytes = m.frames.tail
+      pushBlock(bytesToObjects(bytes))
+
+    case Closed ⇒ logInfo("received closed ")
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
new file mode 100644
index 0000000..dc27178
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object zeromq {
+  implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
new file mode 100644
index 0000000..96af7d7
--- /dev/null
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq;
+
+import org.junit.Test;
+
+import akka.actor.SupervisorStrategy;
+import akka.util.ByteString;
+import akka.zeromq.Subscribe;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
+
+  @Test // tests the API, does not actually test data receiving
+  public void testZeroMQStream() {
+    JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc);
+    String publishUrl = "abc";
+    Subscribe subscribe = new Subscribe((ByteString)null);
+    Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
+      @Override
+      public Iterable<String> call(byte[][] bytes) throws Exception {
+        return null;
+      }
+    };
+
+    JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream(
+      publishUrl, subscribe, bytesToObjects);
+    JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream(
+      publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream(
+      publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
+
+    // To verify that JavaStreamingContextWithKafka is also StreamingContext
+    JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
new file mode 100644
index 0000000..063529a
--- /dev/null
+++ b/external/zeromq/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
new file mode 100644
index 0000000..5adcdb8
--- /dev/null
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+
+class ZeroMQStreamSuite extends TestSuiteBase {
+
+  test("zeromq input stream") {
+    val ssc = new StreamingContext(master, framework, batchDuration)
+    val publishUrl = "abc"
+    val subscribe = new Subscribe(null.asInstanceOf[ByteString])
+    val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
+
+    // tests the API, does not actually test data receiving
+    val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
+    val test2 = ssc.zeroMQStream(
+      publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
+      StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+
+    // TODO: Actually test data receiving
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f9ff781..4e92d2a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -49,9 +49,6 @@ object SparkBuild extends Build {
   lazy val repl = Project("repl", file("repl"), settings = replSettings)
     .dependsOn(core, bagel, mllib)
 
-  lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
-    .dependsOn(core, mllib, bagel, streaming, externalTwitter)
-
   lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
 
   lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
@@ -60,8 +57,6 @@ object SparkBuild extends Build {
 
   lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
 
-  lazy val externalTwitter = Project("streaming-twitter", file("external/twitter"), settings = twitterSettings) dependsOn(streaming)
-
   lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
     .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
 
@@ -94,10 +89,31 @@ object SparkBuild extends Build {
   lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
   lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
 
+  lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings) 
+    .dependsOn(streaming % "compile->compile;test->test")
+
+  lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
+    .dependsOn(streaming % "compile->compile;test->test")
+
+  lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
+    .dependsOn(streaming % "compile->compile;test->test")
+  
+  lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
+    .dependsOn(streaming % "compile->compile;test->test")
+  
+  lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
+    .dependsOn(streaming % "compile->compile;test->test")
+
+  lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
+  lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
+  
+  lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
+    .dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
+
   // Everything except assembly, tools and examples belong to packageProjects
   lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
 
-  lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj)
+  lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) 
 
   def sharedSettings = Defaults.defaultSettings ++ Seq(
     organization       := "org.apache.spark",
@@ -167,7 +183,7 @@ object SparkBuild extends Build {
       </issueManagement>
     ),
 
-/*
+    /*
     publishTo <<= version { (v: String) =>
       val nexus = "https://oss.sonatype.org/"
       if (v.trim.endsWith("SNAPSHOT"))
@@ -176,8 +192,7 @@ object SparkBuild extends Build {
         Some("sonatype-staging"  at nexus + "service/local/staging/deploy/maven2")
     },
 
-*/
-
+    */
 
     libraryDependencies ++= Seq(
         "io.netty"          % "netty-all"       % "4.0.0.CR1",
@@ -264,7 +279,6 @@ object SparkBuild extends Build {
    libraryDependencies <+= scalaVersion(v => "org.scala-lang"  % "scala-reflect"  % v )
   )
 
-
   def examplesSettings = sharedSettings ++ Seq(
     name := "spark-examples",
     libraryDependencies ++= Seq(
@@ -302,21 +316,10 @@ object SparkBuild extends Build {
   def streamingSettings = sharedSettings ++ Seq(
     name := "spark-streaming",
     resolvers ++= Seq(
-      "Eclipse Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/",
       "Apache repo" at "https://repository.apache.org/content/repositories/releases"
     ),
-
     libraryDependencies ++= Seq(
-      "org.apache.flume"        % "flume-ng-sdk"     % "1.2.0" % "compile"     excludeAll(excludeNetty, excludeSnappy),
-      "com.sksamuel.kafka"     %% "kafka"            % "0.8.0-beta1"
-        exclude("com.sun.jdmk", "jmxtools")
-        exclude("com.sun.jmx", "jmxri")
-        exclude("net.sf.jopt-simple", "jopt-simple")
-        excludeAll(excludeNetty),
-      "org.eclipse.paho"        % "mqtt-client"      % "0.4.0",
-      "com.github.sgroschupf"   % "zkclient"         % "0.1"                   excludeAll(excludeNetty),
-      // "org.twitter4j"           % "twitter4j-stream" % "3.0.3"                 excludeAll(excludeNetty),
-      "org.spark-project.akka" %% "akka-zeromq"      % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
+      "commons-io" % "commons-io" % "2.4" 
     )
   )
 
@@ -331,8 +334,8 @@ object SparkBuild extends Build {
   def yarnEnabledSettings = Seq(
     libraryDependencies ++= Seq(
       // Exclude rule required for all ?
-      "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
-      "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+      "org.apache.hadoop" % "hadoop-client"      % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+      "org.apache.hadoop" % "hadoop-yarn-api"    % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
       "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
       "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
     )
@@ -358,9 +361,45 @@ object SparkBuild extends Build {
   )
 
   def twitterSettings() = streamingSettings ++ Seq(
-    name := "spark-twitter",
+    name := "spark-streaming-twitter",
     libraryDependencies ++= Seq(
       "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty)
     )
   )
+  
+  def kafkaSettings() = streamingSettings ++ Seq(
+    name := "spark-streaming-kafka",
+    libraryDependencies ++= Seq(
+      "com.github.sgroschupf"    % "zkclient"   % "0.1"          excludeAll(excludeNetty),
+      "com.sksamuel.kafka"      %% "kafka"      % "0.8.0-beta1"
+        exclude("com.sun.jdmk", "jmxtools")
+        exclude("com.sun.jmx", "jmxri")
+        exclude("net.sf.jopt-simple", "jopt-simple")
+        excludeAll(excludeNetty)
+    )
+  )
+  
+  def flumeSettings() = streamingSettings ++ Seq(
+    name := "spark-streaming-flume",
+    libraryDependencies ++= Seq(
+      "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy)
+    )
+  )
+
+  def zeromqSettings() = streamingSettings ++ Seq(
+    name := "spark-streaming-zeromq",
+    libraryDependencies ++= Seq(
+      "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
+    )
+  )
+
+  def mqttSettings() = streamingSettings ++ Seq(
+    name := "spark-streaming-mqtt",
+    resolvers ++= Seq(
+      "Apache repo" at "https://repository.apache.org/content/repositories/releases"
+    ),
+    libraryDependencies ++= Seq(
+      "org.eclipse.paho" % "mqtt-client" % "0.4.0"
+    )
+  )
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 25b9b70..41898b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,21 +17,6 @@
 
 package org.apache.spark.streaming
 
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-
-import org.apache.spark.streaming.dstream._
-
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.receivers.ActorReceiver
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.receivers.ZeroMQReceiver
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
-import org.apache.spark.streaming.receivers.ActorReceiver
-
 import scala.collection.mutable.Queue
 import scala.collection.Map
 import scala.reflect.ClassTag
@@ -40,15 +25,22 @@ import java.io.InputStream
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.UUID
 
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.scheduler._
+
 import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
-//import twitter4j.Status
-//import twitter4j.auth.Authorization
-import org.apache.spark.streaming.scheduler._
-import akka.util.ByteString
+
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -224,74 +216,6 @@ class StreamingContext private (
   }
 
   /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
-   *                       and each frame has sequence of byte thus it needs the converter
-   *                       (which might be deserializer of bytes) to translate from sequence
-   *                       of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   * @param storageLevel RDD storage level. Defaults to memory-only.
-   */
-  def zeroMQStream[T: ClassTag](
-      publisherUrl:String,
-      subscribe: Subscribe,
-      bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
-      storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
-      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
-    ): DStream[T] = {
-    actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
-        "ZeroMQReceiver", storageLevel, supervisorStrategy)
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kafka Broker.
-   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
-   * @param groupId The group id for this consumer.
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   *               in its own thread.
-   * @param storageLevel  Storage level to use for storing the received objects
-   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
-   */
-  def kafkaStream(
-      zkQuorum: String,
-      groupId: String,
-      topics: Map[String, Int],
-      storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
-    ): DStream[(String, String)] = {
-    val kafkaParams = Map[String, String](
-      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
-      "zookeeper.connection.timeout.ms" -> "10000")
-    kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
-      kafkaParams,
-      topics,
-      storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages from a Kafka Broker.
-   * @param kafkaParams Map of kafka configuration paramaters.
-   *                    See: http://kafka.apache.org/configuration.html
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   *               in its own thread.
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def kafkaStream[
-    K: ClassTag,
-    V: ClassTag,
-    U <: kafka.serializer.Decoder[_]: Manifest,
-    T <: kafka.serializer.Decoder[_]: Manifest](
-      kafkaParams: Map[String, String],
-      topics: Map[String, Int],
-      storageLevel: StorageLevel
-    ): DStream[(K, V)] = {
-    val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel)
-    registerInputStream(inputStream)
-    inputStream
-  }
-
-  /**
    * Create a input stream from TCP source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
    * lines.
@@ -330,22 +254,6 @@ class StreamingContext private (
   }
 
   /**
-   * Create a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def flumeStream (
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[SparkFlumeEvent] = {
-    val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
-    registerInputStream(inputStream)
-    inputStream
-  }
-
-  /**
    * Create a input stream from network source hostname:port, where data is received
    * as serialized blocks (serialized using the Spark's serializer) that can be directly
    * pushed into the block manager without deserializing them. This is the most efficient
@@ -467,21 +375,6 @@ class StreamingContext private (
     inputStream
   }
 
-/**
-   * Create an input stream that receives messages pushed by a mqtt publisher.
-   * @param brokerUrl Url of remote mqtt publisher
-   * @param topic topic name to subscribe to
-   * @param storageLevel RDD storage level. Defaults to memory-only.
-   */
-
-  def mqttStream(
-    brokerUrl: String,
-    topic: String,
-    storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
-    val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
-    registerInputStream(inputStream)
-    inputStream
-  }
   /**
    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
    */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b32cfbb..ea4a0fe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,28 +17,21 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.lang.{Integer => JInt}
-import java.io.InputStream
-import java.util.{Map => JMap, List => JList}
-
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
+import java.io.InputStream
+import java.util.{Map => JMap, List => JList}
+
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-//import twitter4j.Status
 import akka.actor.Props
 import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-import akka.util.ByteString
-
-//import twitter4j.auth.Authorization
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
 import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.scheduler.StreamingListener
 
 /**
@@ -134,81 +127,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
 
   /**
-   * Create an input stream that pulls messages form a Kafka Broker.
-   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
-   * @param groupId The group id for this consumer.
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   * in its own thread.
-   */
-  def kafkaStream(
-    zkQuorum: String,
-    groupId: String,
-    topics: JMap[String, JInt])
-  : JavaPairDStream[String, String] = {
-    implicit val cmt: ClassTag[String] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
-      StorageLevel.MEMORY_ONLY_SER_2)
-
-  }
-
-  /**
-   * Create an input stream that pulls messages form a Kafka Broker.
-   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
-   * @param groupId The group id for this consumer.
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   *               in its own thread.
-   * @param storageLevel RDD storage level. Defaults to memory-only
-   *
-   */
-  def kafkaStream(
-    zkQuorum: String,
-    groupId: String,
-    topics: JMap[String, JInt],
-    storageLevel: StorageLevel)
-  : JavaPairDStream[String, String] = {
-    implicit val cmt: ClassTag[String] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
-      storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages form a Kafka Broker.
-   * @param keyTypeClass Key type of RDD
-   * @param valueTypeClass value type of RDD
-   * @param keyDecoderClass Type of kafka key decoder
-   * @param valueDecoderClass Type of kafka value decoder
-   * @param kafkaParams Map of kafka configuration paramaters.
-   *                    See: http://kafka.apache.org/configuration.html
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   * in its own thread.
-   * @param storageLevel RDD storage level. Defaults to memory-only
-   */
-  def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]](
-    keyTypeClass: Class[K],
-    valueTypeClass: Class[V],
-    keyDecoderClass: Class[U],
-    valueDecoderClass: Class[T],
-    kafkaParams: JMap[String, String],
-    topics: JMap[String, JInt],
-    storageLevel: StorageLevel)
-  : JavaPairDStream[K, V] = {
-    implicit val keyCmt: ClassTag[K] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val valueCmt: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
-    implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
-    implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
-    ssc.kafkaStream[K, V, U, T](
-      kafkaParams.toMap,
-      Map(topics.mapValues(_.intValue()).toSeq: _*),
-      storageLevel)
-  }
-
-  /**
    * Create a input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
    * lines.
@@ -319,98 +237,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
-    JavaDStream[SparkFlumeEvent] = {
-    ssc.flumeStream(hostname, port, storageLevel)
-  }
-
-
-  /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   */
-  def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
-    ssc.flumeStream(hostname, port)
-  }
-  /*
-  /**
-   * Create a input stream that returns tweets received from Twitter.
-   * @param twitterAuth Twitter4J Authorization object
-   * @param filters Set of filter strings to get only those tweets that match them
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def twitterStream(
-      twitterAuth: Authorization,
-      filters: Array[String],
-      storageLevel: StorageLevel
-    ): JavaDStream[Status] = {
-    ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
-   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
-   * .consumerSecret, .accessToken and .accessTokenSecret to be set.
-   * @param filters Set of filter strings to get only those tweets that match them
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def twitterStream(
-      filters: Array[String],
-      storageLevel: StorageLevel
-    ): JavaDStream[Status] = {
-    ssc.twitterStream(None, filters, storageLevel)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter.
-   * @param twitterAuth Twitter4J Authorization
-   * @param filters Set of filter strings to get only those tweets that match them
-   */
-  def twitterStream(
-      twitterAuth: Authorization,
-      filters: Array[String]
-    ): JavaDStream[Status] = {
-    ssc.twitterStream(Some(twitterAuth), filters)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
-   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
-   * .consumerSecret, .accessToken and .accessTokenSecret to be set.
-   * @param filters Set of filter strings to get only those tweets that match them
-   */
-  def twitterStream(
-      filters: Array[String]
-    ): JavaDStream[Status] = {
-    ssc.twitterStream(None, filters)
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter.
-   * @param twitterAuth Twitter4J Authorization
-   */
-  def twitterStream(
-      twitterAuth: Authorization
-    ): JavaDStream[Status] = {
-    ssc.twitterStream(Some(twitterAuth))
-  }
-
-  /**
-   * Create a input stream that returns tweets received from Twitter using Twitter4J's default
-   * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
-   * .consumerSecret, .accessToken and .accessTokenSecret to be set.
-   */
-  def twitterStream(): JavaDStream[Status] = {
-    ssc.twitterStream()
-  }
-  */
-  /**
    * Create an input stream with any arbitrary user implemented actor receiver.
    * @param props Props object defining creation of the actor
    * @param name Name of the actor
@@ -473,70 +299,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def zeroMQStream[T](
-      publisherUrl:String,
-      subscribe: Subscribe,
-      bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
-      storageLevel: StorageLevel,
-      supervisorStrategy: SupervisorStrategy
-    ): JavaDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
-  }
-
-  /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   * @param storageLevel RDD storage level. Defaults to memory-only.
-   */
-  def zeroMQStream[T](
-      publisherUrl:String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
-      storageLevel: StorageLevel
-    ): JavaDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
-    ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
-  }
-
-  /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   */
-  def zeroMQStream[T](
-      publisherUrl:String,
-      subscribe: Subscribe,
-      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
-    ): JavaDStream[T] = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
-    ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
-  }
-
-  /**
    * Registers an output stream that will be computed every interval
    */
   def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
deleted file mode 100644
index 60d7917..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import java.net.InetSocketAddress
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-import org.apache.flume.source.avro.AvroSourceProtocol
-import org.apache.flume.source.avro.AvroFlumeEvent
-import org.apache.flume.source.avro.Status
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.avro.ipc.NettyServer
-
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.util.Utils
-import org.apache.spark.storage.StorageLevel
-
-private[streaming]
-class FlumeInputDStream[T: ClassTag](
-  @transient ssc_ : StreamingContext,
-  host: String,
-  port: Int,
-  storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
-
-  override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
-    new FlumeReceiver(host, port, storageLevel)
-  }
-}
-
-/**
- * A wrapper class for AvroFlumeEvent's with a custom serialization format.
- *
- * This is necessary because AvroFlumeEvent uses inner data structures
- * which are not serializable.
- */
-class SparkFlumeEvent() extends Externalizable {
-  var event : AvroFlumeEvent = new AvroFlumeEvent()
-
-  /* De-serialize from bytes. */
-  def readExternal(in: ObjectInput) {
-    val bodyLength = in.readInt()
-    val bodyBuff = new Array[Byte](bodyLength)
-    in.read(bodyBuff)
-
-    val numHeaders = in.readInt()
-    val headers = new java.util.HashMap[CharSequence, CharSequence]
-
-    for (i <- 0 until numHeaders) {
-      val keyLength = in.readInt()
-      val keyBuff = new Array[Byte](keyLength)
-      in.read(keyBuff)
-      val key : String = Utils.deserialize(keyBuff)
-
-      val valLength = in.readInt()
-      val valBuff = new Array[Byte](valLength)
-      in.read(valBuff)
-      val value : String = Utils.deserialize(valBuff)
-
-      headers.put(key, value)
-    }
-
-    event.setBody(ByteBuffer.wrap(bodyBuff))
-    event.setHeaders(headers)
-  }
-
-  /* Serialize to bytes. */
-  def writeExternal(out: ObjectOutput) {
-    val body = event.getBody.array()
-    out.writeInt(body.length)
-    out.write(body)
-
-    val numHeaders = event.getHeaders.size()
-    out.writeInt(numHeaders)
-    for ((k, v) <- event.getHeaders) {
-      val keyBuff = Utils.serialize(k.toString)
-      out.writeInt(keyBuff.length)
-      out.write(keyBuff)
-      val valBuff = Utils.serialize(v.toString)
-      out.writeInt(valBuff.length)
-      out.write(valBuff)
-    }
-  }
-}
-
-private[streaming] object SparkFlumeEvent {
-  def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
-    val event = new SparkFlumeEvent
-    event.event = in
-    event
-  }
-}
-
-/** A simple server that implements Flume's Avro protocol. */
-private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
-  override def append(event : AvroFlumeEvent) : Status = {
-    receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
-    Status.OK
-  }
-
-  override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
-    events.foreach (event =>
-      receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
-    Status.OK
-  }
-}
-
-/** A NetworkReceiver which listens for events using the
-  * Flume Avro interface.*/
-private[streaming]
-class FlumeReceiver(
-    host: String,
-    port: Int,
-    storageLevel: StorageLevel
-  ) extends NetworkReceiver[SparkFlumeEvent] {
-
-  lazy val blockGenerator = new BlockGenerator(storageLevel)
-
-  protected override def onStart() {
-    val responder = new SpecificResponder(
-      classOf[AvroSourceProtocol], new FlumeEventServer(this))
-    val server = new NettyServer(responder, new InetSocketAddress(host, port))
-    blockGenerator.start()
-    server.start()
-    logInfo("Flume receiver started")
-  }
-
-  protected override def onStop() {
-    blockGenerator.stop()
-    logInfo("Flume receiver stopped")
-  }
-
-  override def getLocationPreference = Some(host)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
deleted file mode 100644
index 526f556..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import kafka.consumer._
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient._
-
-import scala.collection.Map
-import scala.reflect.ClassTag
-
-/**
- * Input stream that pulls messages from a Kafka Broker.
- *
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- */
-private[streaming]
-class KafkaInputDStream[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: Manifest,
-  T <: Decoder[_]: Manifest](
-    @transient ssc_ : StreamingContext,
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    storageLevel: StorageLevel
-  ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
-
-  def getReceiver(): NetworkReceiver[(K, V)] = {
-    new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-        .asInstanceOf[NetworkReceiver[(K, V)]]
-  }
-}
-
-private[streaming]
-class KafkaReceiver[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: Manifest,
-  T <: Decoder[_]: Manifest](
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    storageLevel: StorageLevel
-  ) extends NetworkReceiver[Any] {
-
-  // Handles pushing data into the BlockManager
-  lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-  // Connection to Kafka
-  var consumerConnector : ConsumerConnector = null
-
-  def onStop() {
-    blockGenerator.stop()
-  }
-
-  def onStart() {
-
-    blockGenerator.start()
-
-    // In case we are using multiple Threads to handle Kafka Messages
-    val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
-
-    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
-
-    // Kafka connection properties
-    val props = new Properties()
-    kafkaParams.foreach(param => props.put(param._1, param._2))
-
-    // Create the connection to the cluster
-    logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
-    val consumerConfig = new ConsumerConfig(props)
-    consumerConnector = Consumer.create(consumerConfig)
-    logInfo("Connected to " + kafkaParams("zookeeper.connect"))
-
-    // When autooffset.reset is defined, it is our responsibility to try and whack the
-    // consumer group zk node.
-    if (kafkaParams.contains("auto.offset.reset")) {
-      tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
-    }
-
-    val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[K]]
-    val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[V]]
-
-    // Create Threads for each Topic/Message Stream we are listening
-    val topicMessageStreams = consumerConnector.createMessageStreams(
-      topics, keyDecoder, valueDecoder)
-
-
-    // Start the messages handler for each partition
-    topicMessageStreams.values.foreach { streams =>
-      streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
-    }
-  }
-
-  // Handles Kafka Messages
-  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
-    extends Runnable {
-    def run() {
-      logInfo("Starting MessageHandler.")
-      for (msgAndMetadata <- stream) {
-        blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
-      }
-    }
-  }
-
-  // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
-  // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
-  //
-  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
-  // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
-  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
-  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
-    try {
-      val dir = "/consumers/" + groupId
-      logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
-      zk.deleteRecursive(dir)
-      zk.close()
-    } catch {
-      case _ : Throwable => // swallow
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
deleted file mode 100644
index ef4a737..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
-
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
-
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-/**
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-
-private[streaming] 
-class MQTTInputDStream[T: ClassTag](
-  @transient ssc_ : StreamingContext,
-  brokerUrl: String,
-  topic: String,
-  storageLevel: StorageLevel
-  ) extends NetworkInputDStream[T](ssc_) with Logging {
-  
-  def getReceiver(): NetworkReceiver[T] = {
-    new MQTTReceiver(brokerUrl, topic, storageLevel)
-      .asInstanceOf[NetworkReceiver[T]]
-  }
-}
-
-private[streaming] 
-class MQTTReceiver(brokerUrl: String,
-  topic: String,
-  storageLevel: StorageLevel
-  ) extends NetworkReceiver[Any] {
-  lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-  
-  def onStop() {
-    blockGenerator.stop()
-  }
-  
-  def onStart() {
-
-    blockGenerator.start()
-
-    // Set up persistence for messages 
-    var peristance: MqttClientPersistence = new MemoryPersistence()
-
-    // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
-    var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
-
-    // Connect to MqttBroker    
-    client.connect()
-
-    // Subscribe to Mqtt topic
-    client.subscribe(topic)
-
-    // Callback automatically triggers as and when new message arrives on specified topic
-    var callback: MqttCallback = new MqttCallback() {
-
-      // Handles Mqtt message 
-      override def messageArrived(arg0: String, arg1: MqttMessage) {
-        blockGenerator += new String(arg1.getPayload())
-      }
-
-      override def deliveryComplete(arg0: IMqttDeliveryToken) {
-      }
-
-      override def connectionLost(arg0: Throwable) {
-        logInfo("Connection lost " + arg0)
-      }
-    }
-
-    // Set up callback for MqttClient
-    client.setCallback(callback)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
deleted file mode 100644
index f164d51..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.receivers
-
-import scala.reflect.ClassTag
-
-import akka.actor.Actor
-import akka.util.ByteString
-import akka.zeromq._
-
-import org.apache.spark.Logging
-
-/**
- * A receiver to subscribe to ZeroMQ stream.
- */
-private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
-  subscribe: Subscribe,
-  bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
-  extends Actor with Receiver with Logging {
-
-  override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
-    Connect(publisherUrl), subscribe)
-
-  def receive: Receive = {
-
-    case Connecting ⇒ logInfo("connecting ...")
-
-    case m: ZMQMessage ⇒
-      logDebug("Received message for:" + m.frame(0))
-
-      //We ignore first frame for processing as it is the topic
-      val bytes = m.frames.tail
-      pushBlock(bytesToObjects(bytes))
-
-    case Closed ⇒ logInfo("received closed ")
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 1cd0b9b..2734393 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -33,6 +33,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
   val ssc = jobScheduler.ssc
   val clockClass = System.getProperty(
     "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+  logInfo("Using clock class = " + clockClass)
   val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
   val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
     longTime => generateJobs(new Time(longTime)))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index abff55d..4a8e15d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -160,7 +160,10 @@ class NetworkInputTracker(
       }
       // Run the dummy Spark job to ensure that all slaves have registered.
       // This avoids all the receivers to be scheduled on the same node.
-      ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+      if (!ssc.sparkContext.isLocal) {
+        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+      }
+
 
       // Distribute the receivers and start them
       ssc.sparkContext.runJob(tempRDD, startReceiver)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index daeb99f..f4d26c0 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,22 +17,16 @@
 
 package org.apache.spark.streaming;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
-import kafka.serializer.StringDecoder;
+import scala.Tuple2;
 
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
+import java.io.*;
+import java.util.*;
 
-import scala.Tuple2;
-import twitter4j.Status;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -43,39 +37,11 @@ import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
-import org.apache.spark.streaming.JavaTestUtils;
-import org.apache.spark.streaming.JavaCheckpointTestUtils;
-
-import java.io.*;
-import java.util.*;
-
-import akka.actor.Props;
-import akka.zeromq.Subscribe;
-
 
 // The test suite itself is Serializable so that anonymous Function implementations can be
 // serialized, as an alternative to converting these anonymous classes to static inner classes;
 // see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
-  private transient JavaStreamingContext ssc;
-
-  @Before
-  public void setUp() {
-      System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
-      ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
-    ssc.checkpoint("checkpoint");
-  }
-
-  @After
-  public void tearDown() {
-    ssc.stop();
-    ssc = null;
-
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port");
-  }
-
+public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
   @Test
   public void testCount() {
     List<List<Integer>> inputData = Arrays.asList(
@@ -1597,26 +1563,6 @@ public class JavaAPISuite implements Serializable {
   // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
   // InputStream functionality is deferred to the existing Scala tests.
   @Test
-  public void testKafkaStream() {
-    HashMap<String, Integer> topics = Maps.newHashMap();
-    JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
-    JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
-      StorageLevel.MEMORY_AND_DISK());
-
-    HashMap<String, String> kafkaParams = Maps.newHashMap();
-    kafkaParams.put("zookeeper.connect","localhost:12345");
-    kafkaParams.put("group.id","consumer-group");
-    JavaPairDStream<String, String> test3 = ssc.kafkaStream(
-      String.class,
-      String.class,
-      StringDecoder.class,
-      StringDecoder.class,
-      kafkaParams,
-      topics,
-      StorageLevel.MEMORY_AND_DISK());
-  }
-
-  @Test
   public void testSocketTextStream() {
     JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
   }
@@ -1654,16 +1600,10 @@ public class JavaAPISuite implements Serializable {
   public void testRawSocketStream() {
     JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
   }
-
-  @Test
-  public void testFlumeStream() {
-    JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
-  }
-
+  /*
   @Test
   public void testFileStream() {
-    JavaPairDStream<String, String> foo =
-      ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
+    JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
   }
 
   @Test
@@ -1685,5 +1625,5 @@ public class JavaAPISuite implements Serializable {
         return null;
       }
     });
-  }
+  } */
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..34bee56
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        System.clearProperty("spark.driver.port");
+        System.clearProperty("spark.hostPort");
+        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+
+        // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+        System.clearProperty("spark.driver.port");
+        System.clearProperty("spark.hostPort");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 62a9f12..0cffed6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
 import akka.actor.Props
 import akka.util.ByteString
 
-import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{NetworkReceiver}
 import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
 import java.io.{File, BufferedWriter, OutputStreamWriter}
 import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
@@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import util.ManualClock
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.receivers.Receiver
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.Logging
 import scala.util.Random
 import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfter
-import org.apache.flume.source.avro.AvroSourceProtocol
-import org.apache.flume.source.avro.AvroFlumeEvent
-import org.apache.flume.source.avro.Status
-import org.apache.avro.ipc.{specific, NettyTransceiver}
-import org.apache.avro.ipc.specific.SpecificRequestor
-import java.nio.ByteBuffer
 import collection.JavaConversions._
-import java.nio.charset.Charset
 import com.google.common.io.Files
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -99,55 +92,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   }
 
 
-  test("flume input stream") {
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(master, framework, batchDuration)
-    val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
-    ssc.registerOutputStream(outputStream)
-    ssc.start()
-
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    val input = Seq(1, 2, 3, 4, 5)
-    Thread.sleep(1000)
-    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
-    val client = SpecificRequestor.getClient(
-      classOf[AvroSourceProtocol], transceiver)
-
-    for (i <- 0 until input.size) {
-      val event = new AvroFlumeEvent
-      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
-      event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
-      client.append(event)
-      Thread.sleep(500)
-      clock.addToTime(batchDuration.milliseconds)
-    }
-
-    val startTime = System.currentTimeMillis()
-    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
-      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
-      Thread.sleep(100)
-    }
-    Thread.sleep(1000)
-    val timeTaken = System.currentTimeMillis() - startTime
-    assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
-    logInfo("Stopping context")
-    ssc.stop()
-
-    val decoder = Charset.forName("UTF-8").newDecoder()
-
-    assert(outputBuffer.size === input.length)
-    for (i <- 0 until outputBuffer.size) {
-      assert(outputBuffer(i).size === 1)
-      val str = decoder.decode(outputBuffer(i).head.event.getBody)
-      assert(str.toString === input(i).toString)
-      assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
-    }
-  }
-
-
   test("file input stream") {
     // Disable manual clock as FileInputDStream does not work with manual clock
     System.clearProperty("spark.streaming.clock")
@@ -249,21 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     }
   }
 
-  test("kafka input stream") {
-    val ssc = new StreamingContext(master, framework, batchDuration)
-    val topics = Map("my-topic" -> 1)
-    val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
-    val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
-
-    // Test specifying decoder
-    val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
-    val test3 = ssc.kafkaStream[
-      String,
-      String,
-      kafka.serializer.StringDecoder,
-      kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
-  }
-
   test("multi-thread receiver") {
     // set up the test receiver
     val numThreads = 10

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index e969e91..f56c046 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,11 +137,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
   // if you want to add your stuff to "before" (i.e., don't call before { } )
   def beforeFunction() {
     if (useManualClock) {
-      System.setProperty(
-        "spark.streaming.clock",
-        "org.apache.spark.streaming.util.ManualClock"
-      )
+      logInfo("Using manual clock")
+      System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
     } else {
+      logInfo("Using real clock")
       System.clearProperty("spark.streaming.clock")
     }
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
@@ -273,7 +272,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
       val startTime = System.currentTimeMillis()
       while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
         logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
-        Thread.sleep(100)
+        Thread.sleep(10)
       }
       val timeTaken = System.currentTimeMillis() - startTime