You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/08 07:22:12 UTC
[10/13] Removed XYZFunctions and added XYZUtils as a common Scala and
Java interface for creating XYZ streams.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
deleted file mode 100644
index f4c75ab..0000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.actor.{Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.receivers._
-
-/**
- * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
- */
-class ZeroMQFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T: ClassTag](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
new file mode 100644
index 0000000..546d9df
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+
+object ZeroMQUtils {
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param ssc StreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
deleted file mode 100644
index dc27178..0000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object zeromq {
- implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index b020ae4..d2361e1 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,13 +17,10 @@
package org.apache.spark.streaming.zeromq;
-import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
import org.junit.Test;
-
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
-
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -33,7 +30,6 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@Test // tests the API, does not actually test data receiving
public void testZeroMQStream() {
- ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -43,11 +39,12 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
+ JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects);
+ JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+ ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+ SupervisorStrategy.defaultStrategy());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 5adcdb8..4193b8a 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -33,10 +33,10 @@ class ZeroMQStreamSuite extends TestSuiteBase {
val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
// tests the API, does not actually test data receiving
- val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
- val test2 = ssc.zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
+ val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test2 = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving