You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/08 07:22:04 UTC
[02/13] Refactored kafka, flume, zeromq,
mqtt as separate external projects, with their own self-contained scala API,
java API,
scala unit tests and java unit tests. Updated examples to use the external
projects.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
new file mode 100644
index 0000000..d7f6d35
--- /dev/null
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+import twitter4j.auth.{NullAuthorization, Authorization}
+
+class TwitterStreamSuite extends TestSuiteBase {
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val filters = Seq("filter1", "filter2")
+ val authorization: Authorization = NullAuthorization.getInstance()
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.twitterStream(None)
+ val test2 = ssc.twitterStream(None, filters)
+ val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4 = ssc.twitterStream(Some(authorization))
+ val test5 = ssc.twitterStream(Some(authorization), filters)
+ val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // Note that actually testing the data receiving is hard as authentication keys are
+ // necessary for accessing Twitter live stream
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
new file mode 100644
index 0000000..dc5d1f0
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating ZeroMQ input streams.
+ */
+class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
+ extends JavaStreamingContext(javaStreamingContext.ssc) {
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level.
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
new file mode 100644
index 0000000..f4c75ab
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.receivers._
+
+/**
+ * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
+ */
+class ZeroMQFunctions(ssc: StreamingContext) {
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T: ClassTag](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
new file mode 100644
index 0000000..769761e
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+
+import akka.actor.Actor
+import akka.util.ByteString
+import akka.zeromq._
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receivers._
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
+ extends Actor with Receiver with Logging {
+
+ override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
+ Connect(publisherUrl), subscribe)
+
+ def receive: Receive = {
+
+ case Connecting ⇒ logInfo("connecting ...")
+
+ case m: ZMQMessage ⇒
+ logDebug("Received message for:" + m.frame(0))
+
+ //We ignore first frame for processing as it is the topic
+ val bytes = m.frames.tail
+ pushBlock(bytesToObjects(bytes))
+
+ case Closed ⇒ logInfo("received closed ")
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
new file mode 100644
index 0000000..dc27178
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object zeromq {
+ implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
new file mode 100644
index 0000000..96af7d7
--- /dev/null
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq;
+
+import org.junit.Test;
+
+import akka.actor.SupervisorStrategy;
+import akka.util.ByteString;
+import akka.zeromq.Subscribe;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
+
+ @Test // tests the API, does not actually test data receiving
+ public void testZeroMQStream() {
+ JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc);
+ String publishUrl = "abc";
+ Subscribe subscribe = new Subscribe((ByteString)null);
+ Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
+ @Override
+ public Iterable<String> call(byte[][] bytes) throws Exception {
+ return null;
+ }
+ };
+
+ JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream(
+ publishUrl, subscribe, bytesToObjects);
+ JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream(
+ publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream(
+ publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
+
+ // To verify that JavaStreamingContextWithKafka is also StreamingContext
+ JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
new file mode 100644
index 0000000..063529a
--- /dev/null
+++ b/external/zeromq/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
new file mode 100644
index 0000000..5adcdb8
--- /dev/null
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+
+class ZeroMQStreamSuite extends TestSuiteBase {
+
+ test("zeromq input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val publishUrl = "abc"
+ val subscribe = new Subscribe(null.asInstanceOf[ByteString])
+ val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
+ val test2 = ssc.zeroMQStream(
+ publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
+ StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+
+ // TODO: Actually test data receiving
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f9ff781..4e92d2a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -49,9 +49,6 @@ object SparkBuild extends Build {
lazy val repl = Project("repl", file("repl"), settings = replSettings)
.dependsOn(core, bagel, mllib)
- lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, bagel, streaming, externalTwitter)
-
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
@@ -60,8 +57,6 @@ object SparkBuild extends Build {
lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
- lazy val externalTwitter = Project("streaming-twitter", file("external/twitter"), settings = twitterSettings) dependsOn(streaming)
-
lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
.dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
@@ -94,10 +89,31 @@ object SparkBuild extends Build {
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
+ lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
+ lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
+
+ lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
+ .dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
+
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
- lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj)
+ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@@ -167,7 +183,7 @@ object SparkBuild extends Build {
</issueManagement>
),
-/*
+ /*
publishTo <<= version { (v: String) =>
val nexus = "https://oss.sonatype.org/"
if (v.trim.endsWith("SNAPSHOT"))
@@ -176,8 +192,7 @@ object SparkBuild extends Build {
Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2")
},
-*/
-
+ */
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
@@ -264,7 +279,6 @@ object SparkBuild extends Build {
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
)
-
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq(
@@ -302,21 +316,10 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
resolvers ++= Seq(
- "Eclipse Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/",
"Apache repo" at "https://repository.apache.org/content/repositories/releases"
),
-
libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
- "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
- exclude("com.sun.jdmk", "jmxtools")
- exclude("com.sun.jmx", "jmxri")
- exclude("net.sf.jopt-simple", "jopt-simple")
- excludeAll(excludeNetty),
- "org.eclipse.paho" % "mqtt-client" % "0.4.0",
- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
- // "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
+ "commons-io" % "commons-io" % "2.4"
)
)
@@ -331,8 +334,8 @@ object SparkBuild extends Build {
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
- "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
- "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
)
@@ -358,9 +361,45 @@ object SparkBuild extends Build {
)
def twitterSettings() = streamingSettings ++ Seq(
- name := "spark-twitter",
+ name := "spark-streaming-twitter",
libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty)
)
)
+
+ def kafkaSettings() = streamingSettings ++ Seq(
+ name := "spark-streaming-kafka",
+ libraryDependencies ++= Seq(
+ "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
+ "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
+ exclude("com.sun.jdmk", "jmxtools")
+ exclude("com.sun.jmx", "jmxri")
+ exclude("net.sf.jopt-simple", "jopt-simple")
+ excludeAll(excludeNetty)
+ )
+ )
+
+ def flumeSettings() = streamingSettings ++ Seq(
+ name := "spark-streaming-flume",
+ libraryDependencies ++= Seq(
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy)
+ )
+ )
+
+ def zeromqSettings() = streamingSettings ++ Seq(
+ name := "spark-streaming-zeromq",
+ libraryDependencies ++= Seq(
+ "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
+ )
+ )
+
+ def mqttSettings() = streamingSettings ++ Seq(
+ name := "spark-streaming-mqtt",
+ resolvers ++= Seq(
+ "Apache repo" at "https://repository.apache.org/content/repositories/releases"
+ ),
+ libraryDependencies ++= Seq(
+ "org.eclipse.paho" % "mqtt-client" % "0.4.0"
+ )
+ )
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 25b9b70..41898b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,21 +17,6 @@
package org.apache.spark.streaming
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-
-import org.apache.spark.streaming.dstream._
-
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.receivers.ActorReceiver
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.receivers.ZeroMQReceiver
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
-import org.apache.spark.streaming.receivers.ActorReceiver
-
import scala.collection.mutable.Queue
import scala.collection.Map
import scala.reflect.ClassTag
@@ -40,15 +25,22 @@ import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.scheduler._
+
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-//import twitter4j.Status
-//import twitter4j.auth.Authorization
-import org.apache.spark.streaming.scheduler._
-import akka.util.ByteString
+
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -224,74 +216,6 @@ class StreamingContext private (
}
/**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T: ClassTag](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: Map[String, Int],
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
- ): DStream[(String, String)] = {
- val kafkaParams = Map[String, String](
- "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
- "zookeeper.connection.timeout.ms" -> "10000")
- kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
- kafkaParams,
- topics,
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def kafkaStream[
- K: ClassTag,
- V: ClassTag,
- U <: kafka.serializer.Decoder[_]: Manifest,
- T <: kafka.serializer.Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ): DStream[(K, V)] = {
- val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
-
- /**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
@@ -330,22 +254,6 @@ class StreamingContext private (
}
/**
- * Create a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream (
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
-
- /**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
@@ -467,21 +375,6 @@ class StreamingContext private (
inputStream
}
-/**
- * Create an input stream that receives messages pushed by a mqtt publisher.
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
-
- def mqttStream(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
- val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b32cfbb..ea4a0fe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,28 +17,21 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Integer => JInt}
-import java.io.InputStream
-import java.util.{Map => JMap, List => JList}
-
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import java.io.InputStream
+import java.util.{Map => JMap, List => JList}
+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-//import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-import akka.util.ByteString
-
-//import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.StreamingListener
/**
@@ -134,81 +127,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
/**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt])
- : JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
- StorageLevel.MEMORY_ONLY_SER_2)
-
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- *
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- storageLevel: StorageLevel)
- : JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
- * @param keyDecoderClass Type of kafka key decoder
- * @param valueDecoderClass Type of kafka value decoder
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- */
- def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]](
- keyTypeClass: Class[K],
- valueTypeClass: Class[V],
- keyDecoderClass: Class[U],
- valueDecoderClass: Class[T],
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel)
- : JavaPairDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
- ssc.kafkaStream[K, V, U, T](
- kafkaParams.toMap,
- Map(topics.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
@@ -319,98 +237,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
- JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port, storageLevel)
- }
-
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- */
- def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port)
- }
- /*
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaDStream[Status] = {
- ssc.twitterStream(None, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String]
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(
- filters: Array[String]
- ): JavaDStream[Status] = {
- ssc.twitterStream(None, filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- */
- def twitterStream(
- twitterAuth: Authorization
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth))
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- */
- def twitterStream(): JavaDStream[Status] = {
- ssc.twitterStream()
- }
- */
- /**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
@@ -473,70 +299,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel,
- supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
- }
-
- /**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
deleted file mode 100644
index 60d7917..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import java.net.InetSocketAddress
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-import org.apache.flume.source.avro.AvroSourceProtocol
-import org.apache.flume.source.avro.AvroFlumeEvent
-import org.apache.flume.source.avro.Status
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.avro.ipc.NettyServer
-
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.util.Utils
-import org.apache.spark.storage.StorageLevel
-
-private[streaming]
-class FlumeInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- host: String,
- port: Int,
- storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
-
- override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel)
- }
-}
-
-/**
- * A wrapper class for AvroFlumeEvent's with a custom serialization format.
- *
- * This is necessary because AvroFlumeEvent uses inner data structures
- * which are not serializable.
- */
-class SparkFlumeEvent() extends Externalizable {
- var event : AvroFlumeEvent = new AvroFlumeEvent()
-
- /* De-serialize from bytes. */
- def readExternal(in: ObjectInput) {
- val bodyLength = in.readInt()
- val bodyBuff = new Array[Byte](bodyLength)
- in.read(bodyBuff)
-
- val numHeaders = in.readInt()
- val headers = new java.util.HashMap[CharSequence, CharSequence]
-
- for (i <- 0 until numHeaders) {
- val keyLength = in.readInt()
- val keyBuff = new Array[Byte](keyLength)
- in.read(keyBuff)
- val key : String = Utils.deserialize(keyBuff)
-
- val valLength = in.readInt()
- val valBuff = new Array[Byte](valLength)
- in.read(valBuff)
- val value : String = Utils.deserialize(valBuff)
-
- headers.put(key, value)
- }
-
- event.setBody(ByteBuffer.wrap(bodyBuff))
- event.setHeaders(headers)
- }
-
- /* Serialize to bytes. */
- def writeExternal(out: ObjectOutput) {
- val body = event.getBody.array()
- out.writeInt(body.length)
- out.write(body)
-
- val numHeaders = event.getHeaders.size()
- out.writeInt(numHeaders)
- for ((k, v) <- event.getHeaders) {
- val keyBuff = Utils.serialize(k.toString)
- out.writeInt(keyBuff.length)
- out.write(keyBuff)
- val valBuff = Utils.serialize(v.toString)
- out.writeInt(valBuff.length)
- out.write(valBuff)
- }
- }
-}
-
-private[streaming] object SparkFlumeEvent {
- def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
- val event = new SparkFlumeEvent
- event.event = in
- event
- }
-}
-
-/** A simple server that implements Flume's Avro protocol. */
-private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
- override def append(event : AvroFlumeEvent) : Status = {
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
- Status.OK
- }
-
- override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
- events.foreach (event =>
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
- Status.OK
- }
-}
-
-/** A NetworkReceiver which listens for events using the
- * Flume Avro interface.*/
-private[streaming]
-class FlumeReceiver(
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent] {
-
- lazy val blockGenerator = new BlockGenerator(storageLevel)
-
- protected override def onStart() {
- val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- val server = new NettyServer(responder, new InetSocketAddress(host, port))
- blockGenerator.start()
- server.start()
- logInfo("Flume receiver started")
- }
-
- protected override def onStop() {
- blockGenerator.stop()
- logInfo("Flume receiver stopped")
- }
-
- override def getLocationPreference = Some(host)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
deleted file mode 100644
index 526f556..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import kafka.consumer._
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient._
-
-import scala.collection.Map
-import scala.reflect.ClassTag
-
-/**
- * Input stream that pulls messages from a Kafka Broker.
- *
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- */
-private[streaming]
-class KafkaInputDStream[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
- @transient ssc_ : StreamingContext,
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[(K, V)] = {
- new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- .asInstanceOf[NetworkReceiver[(K, V)]]
- }
-}
-
-private[streaming]
-class KafkaReceiver[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
-
- // Handles pushing data into the BlockManager
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
- // Connection to Kafka
- var consumerConnector : ConsumerConnector = null
-
- def onStop() {
- blockGenerator.stop()
- }
-
- def onStart() {
-
- blockGenerator.start()
-
- // In case we are using multiple Threads to handle Kafka Messages
- val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
-
- logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
-
- // Kafka connection properties
- val props = new Properties()
- kafkaParams.foreach(param => props.put(param._1, param._2))
-
- // Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
- val consumerConfig = new ConsumerConfig(props)
- consumerConnector = Consumer.create(consumerConfig)
- logInfo("Connected to " + kafkaParams("zookeeper.connect"))
-
- // When autooffset.reset is defined, it is our responsibility to try and whack the
- // consumer group zk node.
- if (kafkaParams.contains("auto.offset.reset")) {
- tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
- }
-
- val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[K]]
- val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[V]]
-
- // Create Threads for each Topic/Message Stream we are listening
- val topicMessageStreams = consumerConnector.createMessageStreams(
- topics, keyDecoder, valueDecoder)
-
-
- // Start the messages handler for each partition
- topicMessageStreams.values.foreach { streams =>
- streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
- }
- }
-
- // Handles Kafka Messages
- private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
- extends Runnable {
- def run() {
- logInfo("Starting MessageHandler.")
- for (msgAndMetadata <- stream) {
- blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
- }
- }
- }
-
- // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
- // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
- //
- // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
- // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
- // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
- private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
- try {
- val dir = "/consumers/" + groupId
- logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
- zk.deleteRecursive(dir)
- zk.close()
- } catch {
- case _ : Throwable => // swallow
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
deleted file mode 100644
index ef4a737..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
-
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
-
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-/**
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-
-private[streaming]
-class MQTTInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel)
- .asInstanceOf[NetworkReceiver[T]]
- }
-}
-
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
- def onStop() {
- blockGenerator.stop()
- }
-
- def onStart() {
-
- blockGenerator.start()
-
- // Set up persistence for messages
- var peristance: MqttClientPersistence = new MemoryPersistence()
-
- // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
-
- // Connect to MqttBroker
- client.connect()
-
- // Subscribe to Mqtt topic
- client.subscribe(topic)
-
- // Callback automatically triggers as and when new message arrives on specified topic
- var callback: MqttCallback = new MqttCallback() {
-
- // Handles Mqtt message
- override def messageArrived(arg0: String, arg1: MqttMessage) {
- blockGenerator += new String(arg1.getPayload())
- }
-
- override def deliveryComplete(arg0: IMqttDeliveryToken) {
- }
-
- override def connectionLost(arg0: Throwable) {
- logInfo("Connection lost " + arg0)
- }
- }
-
- // Set up callback for MqttClient
- client.setCallback(callback)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
deleted file mode 100644
index f164d51..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.receivers
-
-import scala.reflect.ClassTag
-
-import akka.actor.Actor
-import akka.util.ByteString
-import akka.zeromq._
-
-import org.apache.spark.Logging
-
-/**
- * A receiver to subscribe to ZeroMQ stream.
- */
-private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
- extends Actor with Receiver with Logging {
-
- override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
- Connect(publisherUrl), subscribe)
-
- def receive: Receive = {
-
- case Connecting ⇒ logInfo("connecting ...")
-
- case m: ZMQMessage ⇒
- logDebug("Received message for:" + m.frame(0))
-
- //We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail
- pushBlock(bytesToObjects(bytes))
-
- case Closed ⇒ logInfo("received closed ")
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 1cd0b9b..2734393 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -33,6 +33,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val ssc = jobScheduler.ssc
val clockClass = System.getProperty(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ logInfo("Using clock class = " + clockClass)
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateJobs(new Time(longTime)))
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index abff55d..4a8e15d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -160,7 +160,10 @@ class NetworkInputTracker(
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
- ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+ if (!ssc.sparkContext.isLocal) {
+ ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+ }
+
// Distribute the receivers and start them
ssc.sparkContext.runJob(tempRDD, startReceiver)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index daeb99f..f4d26c0 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,22 +17,16 @@
package org.apache.spark.streaming;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
-import kafka.serializer.StringDecoder;
+import scala.Tuple2;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
+import java.io.*;
+import java.util.*;
-import scala.Tuple2;
-import twitter4j.Status;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
@@ -43,39 +37,11 @@ import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
-import org.apache.spark.streaming.JavaTestUtils;
-import org.apache.spark.streaming.JavaCheckpointTestUtils;
-
-import java.io.*;
-import java.util.*;
-
-import akka.actor.Props;
-import akka.zeromq.Subscribe;
-
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
- private transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port");
- }
-
+public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -1597,26 +1563,6 @@ public class JavaAPISuite implements Serializable {
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@Test
- public void testKafkaStream() {
- HashMap<String, Integer> topics = Maps.newHashMap();
- JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
- StorageLevel.MEMORY_AND_DISK());
-
- HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect","localhost:12345");
- kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = ssc.kafkaStream(
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- topics,
- StorageLevel.MEMORY_AND_DISK());
- }
-
- @Test
public void testSocketTextStream() {
JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@@ -1654,16 +1600,10 @@ public class JavaAPISuite implements Serializable {
public void testRawSocketStream() {
JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
-
- @Test
- public void testFlumeStream() {
- JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
- }
-
+ /*
@Test
public void testFileStream() {
- JavaPairDStream<String, String> foo =
- ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
+ JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
}
@Test
@@ -1685,5 +1625,5 @@ public class JavaAPISuite implements Serializable {
return null;
}
});
- }
+ } */
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..34bee56
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ System.clearProperty("spark.driver.port");
+ System.clearProperty("spark.hostPort");
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port");
+ System.clearProperty("spark.hostPort");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 62a9f12..0cffed6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
-import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{NetworkReceiver}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
@@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receivers.Receiver
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
-import org.apache.flume.source.avro.AvroSourceProtocol
-import org.apache.flume.source.avro.AvroFlumeEvent
-import org.apache.flume.source.avro.Status
-import org.apache.avro.ipc.{specific, NettyTransceiver}
-import org.apache.avro.ipc.specific.SpecificRequestor
-import java.nio.ByteBuffer
import collection.JavaConversions._
-import java.nio.charset.Charset
import com.google.common.io.Files
import java.util.concurrent.atomic.AtomicInteger
@@ -99,55 +92,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
- test("flume input stream") {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
- val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
- ssc.start()
-
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val input = Seq(1, 2, 3, 4, 5)
- Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver)
-
- for (i <- 0 until input.size) {
- val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
- event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
- client.append(event)
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- }
-
- val startTime = System.currentTimeMillis()
- while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
- Thread.sleep(100)
- }
- Thread.sleep(1000)
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- logInfo("Stopping context")
- ssc.stop()
-
- val decoder = Charset.forName("UTF-8").newDecoder()
-
- assert(outputBuffer.size === input.length)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- val str = decoder.decode(outputBuffer(i).head.event.getBody)
- assert(str.toString === input(i).toString)
- assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
- }
- }
-
-
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
System.clearProperty("spark.streaming.clock")
@@ -249,21 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val topics = Map("my-topic" -> 1)
- val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
- val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
-
- // Test specifying decoder
- val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = ssc.kafkaStream[
- String,
- String,
- kafka.serializer.StringDecoder,
- kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
- }
-
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f4e40661/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index e969e91..f56c046 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,11 +137,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
if (useManualClock) {
- System.setProperty(
- "spark.streaming.clock",
- "org.apache.spark.streaming.util.ManualClock"
- )
+ logInfo("Using manual clock")
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
} else {
+ logInfo("Using real clock")
System.clearProperty("spark.streaming.clock")
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
@@ -273,7 +272,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
- Thread.sleep(100)
+ Thread.sleep(10)
}
val timeTaken = System.currentTimeMillis() - startTime