You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/08 07:22:12 UTC

[10/13] Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
deleted file mode 100644
index f4c75ab..0000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.actor.{Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.receivers._
-
-/**
- * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
- */
-class ZeroMQFunctions(ssc: StreamingContext) {
-  /**
-   * Create an input stream that receives messages pushed by a zeromq publisher.
-   * @param publisherUrl Url of remote zeromq publisher
-   * @param subscribe topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
-   *                       and each frame has sequence of byte thus it needs the converter
-   *                       (which might be deserializer of bytes) to translate from sequence
-   *                       of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
-   * @param storageLevel RDD storage level. Defaults to memory-only.
-   */
-  def zeroMQStream[T: ClassTag](
-      publisherUrl: String,
-      subscribe: Subscribe,
-      bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
-    ): DStream[T] = {
-    ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
-        "ZeroMQReceiver", storageLevel, supervisorStrategy)
-  }
-}
-  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
new file mode 100644
index 0000000..546d9df
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+
+object ZeroMQUtils {
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param ssc            StreamingContext object
+   * @param publisherUrl   Url of remote zeromq publisher
+   * @param subscribe      Topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+   *                       and each frame has sequence of byte thus it needs the converter
+   *                       (which might be deserializer of bytes) to translate from sequence
+   *                       of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel   RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+   */
+  def createStream[T: ClassTag](
+      ssc: StreamingContext,
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+    ): DStream[T] = {
+    ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+      "ZeroMQReceiver", storageLevel, supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param jssc           JavaStreamingContext object
+   * @param publisherUrl   Url of remote ZeroMQ publisher
+   * @param subscribe      Topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      storageLevel: StorageLevel,
+      supervisorStrategy: SupervisorStrategy
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param jssc           JavaStreamingContext object
+   * @param publisherUrl   Url of remote zeromq publisher
+   * @param subscribe      Topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel   RDD storage level.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      storageLevel: StorageLevel
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param jssc           JavaStreamingContext object
+   * @param publisherUrl   Url of remote zeromq publisher
+   * @param subscribe      Topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      publisherUrl: String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+    ): JavaDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
deleted file mode 100644
index dc27178..0000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object zeromq {
-  implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index b020ae4..d2361e1 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,13 +17,10 @@
 
 package org.apache.spark.streaming.zeromq;
 
-import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
 import org.junit.Test;
-
 import akka.actor.SupervisorStrategy;
 import akka.util.ByteString;
 import akka.zeromq.Subscribe;
-
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -33,7 +30,6 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
 
   @Test // tests the API, does not actually test data receiving
   public void testZeroMQStream() {
-    ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
     String publishUrl = "abc";
     Subscribe subscribe = new Subscribe((ByteString)null);
     Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -43,11 +39,12 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
       }
     };
 
-    JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
-      publishUrl, subscribe, bytesToObjects);
-    JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
-      publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
-      publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
+    JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+      ssc, publishUrl, subscribe, bytesToObjects);
+    JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+      ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+      SupervisorStrategy.defaultStrategy());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aa99f226/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 5adcdb8..4193b8a 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -33,10 +33,10 @@ class ZeroMQStreamSuite extends TestSuiteBase {
     val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
 
     // tests the API, does not actually test data receiving
-    val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
-    val test2 = ssc.zeroMQStream(
-      publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
-    val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
+    val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+    val test2 = ZeroMQUtils.createStream(
+      ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
       StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
 
     // TODO: Actually test data receiving