You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/11/30 11:06:23 UTC
bahir git commit: [BAHIR-182] Spark Streaming PubNub connector
Repository: bahir
Updated Branches:
refs/heads/master fb752570c -> 9373fa4e7
[BAHIR-182] Spark Streaming PubNub connector
Implement new connector for PubNub (https://www.pubnub.com/)
which is increasing in popularity as a cloud messaging infrastructure.
Closes #70
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/9373fa4e
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/9373fa4e
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/9373fa4e
Branch: refs/heads/master
Commit: 9373fa4e7feb402f5b85367174afc5ad6b593a04
Parents: fb75257
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Thu Nov 22 14:28:44 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Nov 30 12:03:11 2018 +0100
----------------------------------------------------------------------
README.md | 9 +-
pom.xml | 1 +
streaming-pubnub/README.md | 77 +++++++
.../streaming/pubnub/PubNubWordCount.scala | 91 +++++++++
streaming-pubnub/pom.xml | 78 +++++++
.../streaming/pubnub/PubNubInputDStream.scala | 201 +++++++++++++++++++
.../spark/streaming/pubnub/PubNubUtils.scala | 80 ++++++++
.../streaming/LocalJavaStreamingContext.java | 43 ++++
.../streaming/pubnub/JavaPubNubStreamSuite.java | 37 ++++
.../src/test/resources/log4j.properties | 33 +++
.../pubnub/MessageSerializationSuite.scala | 89 ++++++++
.../streaming/pubnub/PubNubStreamSuite.scala | 195 ++++++++++++++++++
12 files changed, 931 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ebbaea7..93a50ea 100644
--- a/README.md
+++ b/README.md
@@ -47,12 +47,15 @@ Each extension currently available in Apache Bahir has an example application lo
Currently, each submodule has its own README.md, with information on example usages and API.
+* [SQL Cloudant](https://github.com/apache/bahir/blob/master/sql-cloudant/README.md)
+* [SQL Streaming Akka](https://github.com/apache/bahir/blob/master/sql-streaming-akka/README.md)
* [SQL Streaming MQTT](https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/README.md)
* [Streaming Akka](https://github.com/apache/bahir/blob/master/streaming-akka/README.md)
-* [Streaming Mqtt](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md)
-* [Streaming Zeromq](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md)
+* [Streaming MQTT](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md)
+* [Streaming PubNub](https://github.com/apache/bahir/blob/master/streaming-pubnub/README.md)
+* [Streaming Google Pub/Sub](https://github.com/apache/bahir/blob/master/streaming-pubsub/README.md)
* [Streaming Twitter](https://github.com/apache/bahir/blob/master/streaming-twitter/README.md)
-* [SQL Cloudant](sql-cloudant/README.md)
+* [Streaming ZeroMQ](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md)
Furthermore, to generate scaladocs for each module:
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 13407bd..ec419fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
<module>streaming-twitter</module>
<module>streaming-zeromq</module>
<module>streaming-pubsub</module>
+ <module>streaming-pubnub</module>
</modules>
<properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/README.md
----------------------------------------------------------------------
diff --git a/streaming-pubnub/README.md b/streaming-pubnub/README.md
new file mode 100644
index 0000000..3b4e9d3
--- /dev/null
+++ b/streaming-pubnub/README.md
@@ -0,0 +1,77 @@
+# Spark Streaming PubNub Connector
+
+Library for reading data from real-time messaging infrastructure [PubNub](https://www.pubnub.com/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+
+ libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubnub" % "{{site.SPARK_VERSION}}"
+
+Using Maven:
+
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-streaming-pubnub_{{site.SCALA_BINARY_VERSION}}</artifactId>
+ <version>{{site.SPARK_VERSION}}</version>
+ </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+ $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubnub_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+Connector leverages official Java client for PubNub cloud infrastructure. You can import the `PubNubUtils`
+class and create input stream by calling `PubNubUtils.createStream()` as shown below. Security and performance related
+features shall be setup inside standard `PNConfiguration` object. We advise to configure reconnection policy so that
+temporary network outages do not interrupt processing job. Users may subscribe to multiple channels and channel groups,
+as well as specify time token to start receiving messages since given point in time.
+
+For complete code examples, please review _examples_ directory.
+
+### Scala API
+
+ import com.pubnub.api.PNConfiguration
+ import com.pubnub.api.enums.PNReconnectionPolicy
+
+ import org.apache.spark.streaming.pubnub.{PubNubUtils, SparkPubNubMessage}
+
+ val config = new PNConfiguration
+ config.setSubscribeKey(subscribeKey)
+ config.setSecure(true)
+ config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+ val channel = "my-channel"
+
+ val pubNubStream: ReceiverInputDStream[SparkPubNubMessage] = PubNubUtils.createStream(
+ ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2
+ )
+
+### Java API
+
+ import com.pubnub.api.PNConfiguration
+ import com.pubnub.api.enums.PNReconnectionPolicy
+
+ import org.apache.spark.streaming.pubnub.PubNubUtils
+ import org.apache.spark.streaming.pubnub.SparkPubNubMessage
+
+ PNConfiguration config = new PNConfiguration()
+ config.setSubscribeKey(subscribeKey)
+ config.setSecure(true)
+ config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+ Set<String> channels = new HashSet<String>() {{
+ add("my-channel");
+ }};
+
+ ReceiverInputDStream<SparkPubNubMessage> pubNubStream = PubNubUtils.createStream(
+ ssc, config, channels, Collections.EMPTY_SET, null,
+ StorageLevel.MEMORY_AND_DISK_SER_2()
+ )
+
+## Unit Test
+
+Unit tests take advantage of publicly available _demo_ subscription and and publish key, which has limited request rate.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala b/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala
new file mode 100644
index 0000000..fe8aa1e
--- /dev/null
+++ b/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming.pubnub
+
+import com.google.gson.JsonParser
+import com.pubnub.api.PNConfiguration
+import com.pubnub.api.enums.PNReconnectionPolicy
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.pubnub.{PubNubUtils, SparkPubNubMessage}
+
+/**
+ * Consumes messages from a PubNub channel and calculates word count.
+ * For demo purpose, login to PubNub account and produce messages using Debug Console.
+ * Expected message format: {"text": "Hello, World!"}
+ *
+ * Usage: PubNubWordCount <subscribeKey> <channel> <aggregationPeriodMS>
+ * <subscribeKey> subscribe key
+ * <channel> channel
+ * <aggregationPeriodMS> aggregation period in milliseconds
+ *
+ * Example:
+ * $ bin/run-example \
+ * org.apache.spark.examples.streaming.pubnub.PubNubWordCount \
+ * sub-c-2d245192-ee8d-11e8-b4c3-46cd67be4fbd my-channel 60000
+ */
+object PubNubWordCount {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 3) {
+ // scalastyle:off println
+ System.err.println(
+ """
+ |Usage: PubNubWordCount <subscribeKey> <channel>
+ |
+ | <subscribeKey> subscribe key
+ | <channel> channel
+ | <aggregationPeriodMS> aggregation period in milliseconds
+ |
+ """.stripMargin
+ )
+ // scalastyle:on
+ System.exit(1)
+ }
+
+ val Seq(subscribeKey, channel, aggregationPeriod) = args.toSeq
+
+ val sparkConf = new SparkConf().setAppName("PubNubWordCount").setMaster("local[2]")
+ val ssc = new StreamingContext(sparkConf, Milliseconds(aggregationPeriod.toLong))
+
+ val config = new PNConfiguration
+ config.setSubscribeKey(subscribeKey)
+ config.setSecure(true)
+ config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+
+ val pubNubStream: ReceiverInputDStream[SparkPubNubMessage] = PubNubUtils.createStream(
+ ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ val wordCounts = pubNubStream
+ .flatMap(
+ message => new JsonParser().parse(message.getPayload)
+ .getAsJsonObject.get("text").getAsString.split("\\s")
+ )
+ .map(word => (word, 1))
+ .reduceByKey(_ + _)
+
+ wordCounts.print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml
new file mode 100644
index 0000000..f233961
--- /dev/null
+++ b/streaming-pubnub/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>bahir-parent_2.11</artifactId>
+ <groupId>org.apache.bahir</groupId>
+ <version>2.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-streaming-pubnub_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-pubnub</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Apache Bahir - Spark Streaming PubNub</name>
+ <url>http://bahir.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.pubnub</groupId>
+ <artifactId>pubnub-gson</artifactId>
+ <version>4.21.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala
new file mode 100644
index 0000000..794784b
--- /dev/null
+++ b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import collection.JavaConverters._
+import com.google.gson.JsonParser
+import com.pubnub.api.PNConfiguration
+import com.pubnub.api.PubNub
+import com.pubnub.api.callbacks.SubscribeCallback
+import com.pubnub.api.enums.PNReconnectionPolicy
+import com.pubnub.api.models.consumer.PNStatus
+import com.pubnub.api.models.consumer.pubsub.PNMessageResult
+import com.pubnub.api.models.consumer.pubsub.PNPresenceEventResult
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
+
+private[streaming]
+class PubNubInputDStream(_ssc: StreamingContext,
+ val configuration: PNConfiguration,
+ val channels: Seq[String],
+ val channelGroups: Seq[String],
+ val timeToken: Option[Long],
+ val _storageLevel: StorageLevel)
+ extends ReceiverInputDStream[SparkPubNubMessage](_ssc) {
+ override def getReceiver(): Receiver[SparkPubNubMessage] = {
+ new PubNubReceiver(
+ new SparkPubNubNConfiguration(configuration), channels, channelGroups,
+ timeToken, _storageLevel
+ )
+ }
+}
+
+/**
+ * Wrapper class for PNConfiguration with only consumer-related, serializable properties.
+ * PubNub configuration model encapsulates various fields which are not serializable.
+ */
+private[pubnub]
+class SparkPubNubNConfiguration(configuration: PNConfiguration) extends Serializable {
+ var origin: String = configuration.getOrigin
+ var subscribeTimeout: Integer = configuration.getSubscribeTimeout
+ var secure: Boolean = configuration.isSecure
+ var subscribeKey: String = configuration.getSubscribeKey
+ var publishKey: String = configuration.getPublishKey
+ var cipherKey: String = configuration.getCipherKey
+ var authKey: String = configuration.getAuthKey
+ var uuid: String = configuration.getUuid
+ var connectTimeout: Integer = configuration.getConnectTimeout
+ var filterExpression: String = configuration.getFilterExpression
+ var reconnectionPolicy: PNReconnectionPolicy = configuration.getReconnectionPolicy
+ var maximumReconnectionRetries: Integer = configuration.getMaximumReconnectionRetries
+ var maximumConnections: Integer = configuration.getMaximumConnections
+
+ def toConfiguration: PNConfiguration = {
+ val config = new PNConfiguration()
+ config.setOrigin(origin)
+ config.setSubscribeTimeout(subscribeTimeout)
+ config.setSecure(secure)
+ config.setSubscribeKey(subscribeKey)
+ config.setPublishKey(publishKey)
+ config.setCipherKey(cipherKey)
+ config.setAuthKey(authKey)
+ config.setUuid(uuid)
+ config.setConnectTimeout(connectTimeout)
+ config.setFilterExpression(filterExpression)
+ config.setReconnectionPolicy(reconnectionPolicy)
+ config.setMaximumReconnectionRetries(maximumReconnectionRetries)
+ config.setMaximumConnections(maximumConnections)
+ config
+ }
+}
+
+/**
+ * Wrapper class for PNMessageResult with a custom serialization process.
+ * PubNub message model uses GSON objects which are not serializable.
+ */
+class SparkPubNubMessage extends Externalizable {
+ var message: PNMessageResult = _
+
+ // PubNub does not support sending empty messages.
+ def getPayload: String = message.getMessage.toString
+ def getChannel: String = message.getChannel
+ def getPublisher: String = message.getPublisher
+ def getSubscription: String = message.getSubscription
+ // Convert to Unix timestamp.
+ def getTimestamp: Long = Math.ceil(message.getTimetoken / 10000).longValue()
+
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+ def writeVariableLength(data: Any): Unit = {
+ data match {
+ case null => out.writeInt(-1)
+ case d =>
+ val buffer = Utils.serialize(d)
+ out.writeInt(buffer.length)
+ out.write(buffer)
+ }
+ }
+
+ writeVariableLength(
+ if (message.getMessage != null) message.getMessage.toString else null
+ )
+ writeVariableLength(message.getChannel)
+ writeVariableLength(message.getPublisher)
+ writeVariableLength(message.getSubscription)
+
+ out.writeLong(message.getTimetoken)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
+ def readVariableLength(): Any = {
+ in.readInt match {
+ case -1 => null
+ case length =>
+ val buffer = new Array[Byte](length)
+ in.readFully(buffer)
+ Utils.deserialize(buffer)
+ }
+ }
+
+ val parser = new JsonParser
+ val builder = PNMessageResult.builder
+
+ readVariableLength() match {
+ case null =>
+ case data => builder.message(parser.parse(data.asInstanceOf[String]))
+ }
+
+ builder.channel(readVariableLength().asInstanceOf[String])
+ builder.publisher(readVariableLength().asInstanceOf[String])
+ builder.subscription(readVariableLength().asInstanceOf[String])
+ builder.timetoken(in.readLong())
+
+ message = builder.build()
+ }
+}
+
+private[pubnub]
+class PubNubReceiver(configuration: SparkPubNubNConfiguration,
+ channels: Seq[String],
+ channelGroups: Seq[String],
+ timeToken: Option[Long],
+ storageLevel: StorageLevel)
+ extends Receiver[SparkPubNubMessage](storageLevel) with Logging {
+
+ var client: PubNub = _
+
+ override def onStart(): Unit = {
+ client = new PubNub(configuration.toConfiguration)
+ client.addListener(
+ new SubscribeCallback() {
+ def status(pubNub: PubNub, status: PNStatus): Unit = {
+ if (status.isError) {
+ log.error(s"Encountered PubNub error: $status.")
+ }
+ }
+
+ def message(pubNub: PubNub, message: PNMessageResult): Unit = {
+ val record = new SparkPubNubMessage
+ record.message = message
+ store(record)
+ }
+
+ def presence(pubNub: PubNub, presence: PNPresenceEventResult): Unit = {
+ }
+ }
+ )
+ val builder = client.subscribe()
+ .channels(channels.toList.asJava)
+ .channelGroups(channelGroups.toList.asJava)
+ if (timeToken.isDefined) {
+ builder.withTimetoken(timeToken.get)
+ }
+ builder.execute()
+ }
+
+ override def onStop(): Unit = {
+ client.unsubscribeAll()
+ client.destroy()
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala
new file mode 100644
index 0000000..4aa47eb
--- /dev/null
+++ b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.util.{Set => JSet}
+
+import collection.JavaConverters._
+import com.pubnub.api.PNConfiguration
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object PubNubUtils {
+ /**
+ * Create an input stream that returns messages received from PubNub infrastructure.
+ * @param ssc Streaming context
+ * @param configuration PubNub client configuration
+ * @param channels Sequence of channels to subscribe
+ * @param channelGroups Sequence of channel groups to subscribe
+ * @param timeToken Optional point in time to start receiving messages from.
+ * Leave undefined to get only latest messages.
+ * @param storageLevel Storage level to use for storing the received objects
+ * @return Input stream
+ */
+ def createStream(
+ ssc: StreamingContext,
+ configuration: PNConfiguration,
+ channels: Seq[String],
+ channelGroups: Seq[String],
+ timeToken: Option[Long] = None,
+ storageLevel: StorageLevel): ReceiverInputDStream[SparkPubNubMessage] = {
+ ssc.withNamedScope("PubNub Stream") {
+ new PubNubInputDStream(
+ ssc, configuration, channels, channelGroups, timeToken, storageLevel
+ )
+ }
+ }
+
+ /**
+ * Create an input stream that returns messages received from PubNub infrastructure.
+ * @param jssc Java streaming context
+ * @param configuration PubNub client configuration
+ * @param channels Set of channels to subscribe
+ * @param channelGroups Set of channel groups to subscribe
+ * @param timeToken Optional point in time to start receiving messages from.
+ * Specify <code>null</code> to get only latest messages.
+ * @param storageLevel Storage level to use for storing the received objects
+ * @return Input stream
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ configuration: PNConfiguration,
+ channels: JSet[String],
+ channelGroups: JSet[String],
+ timeToken: Option[Long],
+ storageLevel: StorageLevel): JavaReceiverInputDStream[SparkPubNubMessage] = {
+ createStream(
+ jssc.ssc, configuration, Seq.empty ++ channels.asScala,
+ Seq.empty ++ channelGroups.asScala, timeToken, storageLevel
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..448fb5e
--- /dev/null
+++ b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java
new file mode 100644
index 0000000..507b992
--- /dev/null
+++ b/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub;
+
+import com.pubnub.api.PNConfiguration;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+import java.util.HashSet;
+
+public class JavaPubNubStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testPubNubStream() {
+ // Tests the API compatibility, but do not actually receive any data.
+ JavaReceiverInputDStream<SparkPubNubMessage> stream = PubNubUtils.createStream(
+ ssc, new PNConfiguration(), new HashSet<>(), new HashSet<>(), null,
+ StorageLevel.MEMORY_AND_DISK_SER_2()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/resources/log4j.properties b/streaming-pubnub/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bcb37d2
--- /dev/null
+++ b/streaming-pubnub/src/test/resources/log4j.properties
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=INFO, console, file
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala
new file mode 100644
index 0000000..9d1c3e9
--- /dev/null
+++ b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+
+import com.google.gson.JsonParser
+import com.pubnub.api.models.consumer.pubsub.PNMessageResult
+
+import org.apache.spark.SparkFunSuite
+
+class MessageSerializationSuite extends SparkFunSuite {
+ test("Full example") {
+ checkMessageSerialization(
+ "{\"message\":\"Hello, World!\"}", "channel1",
+ "publisher1", "subscription1", System.currentTimeMillis * 10000
+ )
+ }
+
+ test("Message from channel") {
+ checkMessageSerialization("{\"message\":\"Hello, World!\"}", "c", "p", null, 13534398158620385L)
+ }
+
+ test("Message from subscription") {
+ checkMessageSerialization("{\"message\":\"Hello, World!\"}", null, "p", "s", 13534397812467596L)
+ }
+
+ def checkMessageSerialization(payload: String, channel: String,
+ publisher: String, subscription: String, timestamp: Long): Unit = {
+ val builder = PNMessageResult.builder
+ .message(if (payload != null) new JsonParser().parse(payload) else null)
+ .channel(channel)
+ .publisher(publisher)
+ .subscription(subscription)
+ .timetoken(timestamp)
+ val pubNubMessage = builder.build()
+ val sparkMessage = new SparkPubNubMessage
+ sparkMessage.message = pubNubMessage
+
+ // serializer
+ val byteOutStream = new ByteArrayOutputStream
+ val outputStream = new ObjectOutputStream(byteOutStream)
+ outputStream.writeObject(sparkMessage)
+ outputStream.flush()
+ outputStream.close()
+ byteOutStream.close()
+ val serializedBytes = byteOutStream.toByteArray
+
+ // deserialize
+ val byteInStream = new ByteArrayInputStream(serializedBytes)
+ val inputStream = new ObjectInputStream(byteInStream)
+ val deserializedMessage = inputStream.readObject().asInstanceOf[SparkPubNubMessage]
+ inputStream.close()
+ byteInStream.close()
+
+ assert(payload.equals(deserializedMessage.getPayload))
+ if (channel != null) {
+ assert(channel.equals(deserializedMessage.getChannel))
+ } else {
+ assert(deserializedMessage.getChannel == null)
+ }
+ if (subscription != null) {
+ assert(subscription.equals(deserializedMessage.getSubscription))
+ } else {
+ assert(deserializedMessage.getSubscription == null)
+ }
+ assert(publisher.equals(deserializedMessage.getPublisher))
+ val unixTimestamp = Math.ceil(timestamp / 10000).longValue()
+ assert(unixTimestamp.equals(deserializedMessage.getTimestamp))
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala
new file mode 100644
index 0000000..aa461db
--- /dev/null
+++ b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.util.{Map => JMap}
+import java.util.UUID
+
+import collection.JavaConverters._
+import com.google.gson.JsonObject
+import com.pubnub.api.{PNConfiguration, PubNub, PubNubException}
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time
+import org.scalatest.time.Span
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+
+class PubNubStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter {
+ val subscribeKey = "demo"
+ val publishKey = "demo"
+ val channel = "test"
+
+ var ssc: StreamingContext = _
+ var configuration: PNConfiguration = _
+ var client: PubNub = _
+
+ override def beforeAll(): Unit = {
+ configuration = new PNConfiguration()
+ configuration.setSubscribeKey(subscribeKey)
+ configuration.setPublishKey(publishKey)
+ client = new PubNub(configuration)
+ }
+
+ override def afterAll(): Unit = {
+ client.destroy()
+ }
+
+ before {
+ ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, Seconds(1))
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ }
+
+ test("Stream receives messages") {
+ val nbOfMsg = 5
+ var publishedMessages: List[JsonObject] = List()
+ @volatile var receivedMessages: Set[SparkPubNubMessage] = Set()
+
+ val receiveStream = PubNubUtils.createStream(
+ ssc, configuration, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2
+ )
+ receiveStream.foreachRDD { rdd =>
+ if (rdd.collect().length > 0) {
+ receivedMessages = receivedMessages ++ List(rdd.first)
+ receivedMessages
+ }
+ }
+ ssc.start()
+
+ (1 to nbOfMsg).foreach(
+ _ => {
+ val message = new JsonObject()
+ message.addProperty("text", UUID.randomUUID().toString)
+ publishedMessages = message :: publishedMessages
+ }
+ )
+
+ eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+ publishedMessages.foreach(
+ m => if (!receivedMessages.map(m => m.getPayload).contains(m.toString)) publishMessage(m)
+ )
+ assert(
+ publishedMessages.map(m => m.toString).toSet
+ .subsetOf(receivedMessages.map(m => m.getPayload))
+ )
+ assert(channel.equals(receivedMessages.head.getChannel))
+ }
+ }
+
+ test("Message filtering") {
+ val config = new PNConfiguration()
+ config.setSubscribeKey(subscribeKey)
+ config.setPublishKey(publishKey)
+ config.setFilterExpression("language == 'english'")
+
+ @volatile var receivedMessages: Set[SparkPubNubMessage] = Set()
+
+ val receiveStream = PubNubUtils.createStream(
+ ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2
+ )
+ receiveStream.foreachRDD { rdd =>
+ if (rdd.collect().length > 0) {
+ receivedMessages = receivedMessages ++ List(rdd.first)
+ receivedMessages
+ }
+ }
+ ssc.start()
+
+ eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+ val polishMessage = new JsonObject()
+ polishMessage.addProperty("text", "dzien dobry")
+ publishMessage(polishMessage, Map("language" -> "polish").asJava)
+
+ val englishMessage = new JsonObject()
+ englishMessage.addProperty("text", "good morning")
+ publishMessage(englishMessage, Map("language" -> "english").asJava)
+
+ assert(receivedMessages.map(m => m.getPayload).size == 1)
+ assert(receivedMessages.head.getPayload.equals(englishMessage.toString))
+ }
+ }
+
+ test("Test time token") {
+ val config = new PNConfiguration()
+ config.setSubscribeKey(subscribeKey)
+ config.setPublishKey(publishKey)
+
+ @volatile var receivedMessages: Set[SparkPubNubMessage] = Set()
+
+ val currentTimeToken = client.time().sync().getTimetoken
+
+ // Try to register subscriber with time token after we send first message.
+ val receiveStream = PubNubUtils.createStream(
+ ssc, config, Seq(channel), Seq(), Some(currentTimeToken + 5000*10000),
+ StorageLevel.MEMORY_AND_DISK_SER_2
+ )
+ receiveStream.foreachRDD { rdd =>
+ if (rdd.collect().length > 0) {
+ receivedMessages = receivedMessages ++ List(rdd.first)
+ receivedMessages
+ }
+ }
+ ssc.start()
+
+ // Give time for subscription to successfully register.
+ Thread.sleep(1000)
+
+ // Make sure we publish the message. Hopefully it will not take more than 5 seconds.
+ // Otherwise we may see the message and test will fails.
+ val message = new JsonObject()
+ message.addProperty("text", "past")
+ var timeToken = -1L
+ while (timeToken == -1L) {
+ timeToken = publishMessage(message = message, store = true)
+ Thread.sleep(500)
+ }
+
+ eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+ val message = new JsonObject()
+ message.addProperty("text", "future")
+ publishMessage(message)
+
+ assert(receivedMessages.map(m => m.getPayload).size == 1)
+ assert(receivedMessages.head.getPayload.equals(message.toString))
+ }
+ }
+
+ def publishMessage(message: JsonObject,
+ metadata: JMap[String, String] = Map.empty[String, String].asJava,
+ store: Boolean = false) : Long = {
+ try {
+ client.publish().channel(channel).meta(metadata)
+ .message(message).shouldStore(store).sync().getTimetoken
+ } catch {
+ case e: PubNubException =>
+ if (!e.getErrormsg.contains("Account quota exceeded (2/1000000)")) {
+ // Ignore quota limits on demo account. We will retry.
+ throw new RuntimeException(e)
+ }
+ -1
+ }
+ }
+}