You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/11/30 11:06:23 UTC

bahir git commit: [BAHIR-182] Spark Streaming PubNub connector

Repository: bahir
Updated Branches:
  refs/heads/master fb752570c -> 9373fa4e7


[BAHIR-182] Spark Streaming PubNub connector

Implement new connector for PubNub (https://www.pubnub.com/)
which is increasing in popularity as a cloud messaging infrastructure.

Closes #70


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/9373fa4e
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/9373fa4e
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/9373fa4e

Branch: refs/heads/master
Commit: 9373fa4e7feb402f5b85367174afc5ad6b593a04
Parents: fb75257
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Thu Nov 22 14:28:44 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Nov 30 12:03:11 2018 +0100

----------------------------------------------------------------------
 README.md                                       |   9 +-
 pom.xml                                         |   1 +
 streaming-pubnub/README.md                      |  77 +++++++
 .../streaming/pubnub/PubNubWordCount.scala      |  91 +++++++++
 streaming-pubnub/pom.xml                        |  78 +++++++
 .../streaming/pubnub/PubNubInputDStream.scala   | 201 +++++++++++++++++++
 .../spark/streaming/pubnub/PubNubUtils.scala    |  80 ++++++++
 .../streaming/LocalJavaStreamingContext.java    |  43 ++++
 .../streaming/pubnub/JavaPubNubStreamSuite.java |  37 ++++
 .../src/test/resources/log4j.properties         |  33 +++
 .../pubnub/MessageSerializationSuite.scala      |  89 ++++++++
 .../streaming/pubnub/PubNubStreamSuite.scala    | 195 ++++++++++++++++++
 12 files changed, 931 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ebbaea7..93a50ea 100644
--- a/README.md
+++ b/README.md
@@ -47,12 +47,15 @@ Each extension currently available in Apache Bahir has an example application lo
 
 Currently, each submodule has its own README.md, with information on example usages and API.
 
+* [SQL Cloudant](https://github.com/apache/bahir/blob/master/sql-cloudant/README.md)
+* [SQL Streaming Akka](https://github.com/apache/bahir/blob/master/sql-streaming-akka/README.md)
 * [SQL Streaming MQTT](https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/README.md)
 * [Streaming Akka](https://github.com/apache/bahir/blob/master/streaming-akka/README.md)
-* [Streaming Mqtt](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md)
-* [Streaming Zeromq](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md)
+* [Streaming MQTT](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md)
+* [Streaming PubNub](https://github.com/apache/bahir/blob/master/streaming-pubnub/README.md)
+* [Streaming Google Pub/Sub](https://github.com/apache/bahir/blob/master/streaming-pubsub/README.md)
 * [Streaming Twitter](https://github.com/apache/bahir/blob/master/streaming-twitter/README.md)
-* [SQL Cloudant](sql-cloudant/README.md)
+* [Streaming ZeroMQ](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md)
 
 Furthermore, to generate scaladocs for each module:
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 13407bd..ec419fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
     <module>streaming-twitter</module>
     <module>streaming-zeromq</module>
     <module>streaming-pubsub</module>
+    <module>streaming-pubnub</module>
   </modules>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/README.md
----------------------------------------------------------------------
diff --git a/streaming-pubnub/README.md b/streaming-pubnub/README.md
new file mode 100644
index 0000000..3b4e9d3
--- /dev/null
+++ b/streaming-pubnub/README.md
@@ -0,0 +1,77 @@
+# Spark Streaming PubNub Connector
+
+Library for reading data from real-time messaging infrastructure [PubNub](https://www.pubnub.com/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+    
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubnub" % "{{site.SPARK_VERSION}}"
+    
+Using Maven:
+    
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-pubnub_{{site.SCALA_BINARY_VERSION}}</artifactId>
+        <version>{{site.SPARK_VERSION}}</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubnub_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+Connector leverages official Java client for PubNub cloud infrastructure. You can import the `PubNubUtils`
+class and create input stream by calling `PubNubUtils.createStream()` as shown below. Security and performance related
+features shall be setup inside standard `PNConfiguration` object. We advise to configure reconnection policy so that
+temporary network outages do not interrupt processing job. Users may subscribe to multiple channels and channel groups,
+as well as specify time token to start receiving messages since given point in time.
+
+For complete code examples, please review _examples_ directory.
+
+### Scala API
+
+    import com.pubnub.api.PNConfiguration
+    import com.pubnub.api.enums.PNReconnectionPolicy
+    
+    import org.apache.spark.streaming.pubnub.{PubNubUtils, SparkPubNubMessage}
+
+    val config = new PNConfiguration
+    config.setSubscribeKey(subscribeKey)
+    config.setSecure(true)
+    config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+    val channel = "my-channel"
+
+    val pubNubStream: ReceiverInputDStream[SparkPubNubMessage] = PubNubUtils.createStream(
+      ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2
+    )
+
+### Java API
+
+    import com.pubnub.api.PNConfiguration
+    import com.pubnub.api.enums.PNReconnectionPolicy
+    
+    import org.apache.spark.streaming.pubnub.PubNubUtils
+    import org.apache.spark.streaming.pubnub.SparkPubNubMessage
+
+    PNConfiguration config = new PNConfiguration()
+    config.setSubscribeKey(subscribeKey)
+    config.setSecure(true)
+    config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+    Set<String> channels = new HashSet<String>() {{
+        add("my-channel");
+    }};
+
+    ReceiverInputDStream<SparkPubNubMessage> pubNubStream = PubNubUtils.createStream(
+      ssc, config, channels, Collections.EMPTY_SET, null,
+      StorageLevel.MEMORY_AND_DISK_SER_2()
+    )
+
+## Unit Test
+
+Unit tests take advantage of publicly available _demo_ subscription and and publish key, which has limited request rate.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala b/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala
new file mode 100644
index 0000000..fe8aa1e
--- /dev/null
+++ b/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming.pubnub
+
+import com.google.gson.JsonParser
+import com.pubnub.api.PNConfiguration
+import com.pubnub.api.enums.PNReconnectionPolicy
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.pubnub.{PubNubUtils, SparkPubNubMessage}
+
+/**
+ * Consumes messages from a PubNub channel and calculates word count.
+ * For demo purpose, login to PubNub account and produce messages using Debug Console.
+ * Expected message format: {"text": "Hello, World!"}
+ *
+ * Usage: PubNubWordCount <subscribeKey> <channel> <aggregationPeriodMS>
+ *   <subscribeKey> subscribe key
+ *   <channel> channel
+ *   <aggregationPeriodMS> aggregation period in milliseconds
+ *
+ * Example:
+ *  $ bin/run-example \
+ *      org.apache.spark.examples.streaming.pubnub.PubNubWordCount \
+ *         sub-c-2d245192-ee8d-11e8-b4c3-46cd67be4fbd my-channel 60000
+ */
+object PubNubWordCount {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 3) {
+      // scalastyle:off println
+      System.err.println(
+        """
+          |Usage: PubNubWordCount <subscribeKey> <channel>
+          |
+          |     <subscribeKey>        subscribe key
+          |     <channel>             channel
+          |     <aggregationPeriodMS> aggregation period in milliseconds
+          |
+        """.stripMargin
+      )
+      // scalastyle:on
+      System.exit(1)
+    }
+
+    val Seq(subscribeKey, channel, aggregationPeriod) = args.toSeq
+
+    val sparkConf = new SparkConf().setAppName("PubNubWordCount").setMaster("local[2]")
+    val ssc = new StreamingContext(sparkConf, Milliseconds(aggregationPeriod.toLong))
+
+    val config = new PNConfiguration
+    config.setSubscribeKey(subscribeKey)
+    config.setSecure(true)
+    config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+
+    val pubNubStream: ReceiverInputDStream[SparkPubNubMessage] = PubNubUtils.createStream(
+      ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+    val wordCounts = pubNubStream
+        .flatMap(
+          message => new JsonParser().parse(message.getPayload)
+            .getAsJsonObject.get("text").getAsString.split("\\s")
+        )
+        .map(word => (word, 1))
+        .reduceByKey(_ + _)
+
+    wordCounts.print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml
new file mode 100644
index 0000000..f233961
--- /dev/null
+++ b/streaming-pubnub/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>bahir-parent_2.11</artifactId>
+    <groupId>org.apache.bahir</groupId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-streaming-pubnub_2.11</artifactId>
+  <properties>
+    <sbt.project.name>streaming-pubnub</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Apache Bahir - Spark Streaming PubNub</name>
+  <url>http://bahir.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.pubnub</groupId>
+      <artifactId>pubnub-gson</artifactId>
+      <version>4.21.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala
new file mode 100644
index 0000000..794784b
--- /dev/null
+++ b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import collection.JavaConverters._
+import com.google.gson.JsonParser
+import com.pubnub.api.PNConfiguration
+import com.pubnub.api.PubNub
+import com.pubnub.api.callbacks.SubscribeCallback
+import com.pubnub.api.enums.PNReconnectionPolicy
+import com.pubnub.api.models.consumer.PNStatus
+import com.pubnub.api.models.consumer.pubsub.PNMessageResult
+import com.pubnub.api.models.consumer.pubsub.PNPresenceEventResult
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
+
+private[streaming]
+class PubNubInputDStream(_ssc: StreamingContext,
+    val configuration: PNConfiguration,
+    val channels: Seq[String],
+    val channelGroups: Seq[String],
+    val timeToken: Option[Long],
+    val _storageLevel: StorageLevel)
+  extends ReceiverInputDStream[SparkPubNubMessage](_ssc) {
+  override def getReceiver(): Receiver[SparkPubNubMessage] = {
+    new PubNubReceiver(
+      new SparkPubNubNConfiguration(configuration), channels, channelGroups,
+      timeToken, _storageLevel
+    )
+  }
+}
+
+/**
+ * Wrapper class for PNConfiguration with only consumer-related, serializable properties.
+ * PubNub configuration model encapsulates various fields which are not serializable.
+ */
+private[pubnub]
+class SparkPubNubNConfiguration(configuration: PNConfiguration) extends Serializable {
+  var origin: String = configuration.getOrigin
+  var subscribeTimeout: Integer = configuration.getSubscribeTimeout
+  var secure: Boolean = configuration.isSecure
+  var subscribeKey: String = configuration.getSubscribeKey
+  var publishKey: String = configuration.getPublishKey
+  var cipherKey: String = configuration.getCipherKey
+  var authKey: String = configuration.getAuthKey
+  var uuid: String = configuration.getUuid
+  var connectTimeout: Integer = configuration.getConnectTimeout
+  var filterExpression: String = configuration.getFilterExpression
+  var reconnectionPolicy: PNReconnectionPolicy = configuration.getReconnectionPolicy
+  var maximumReconnectionRetries: Integer = configuration.getMaximumReconnectionRetries
+  var maximumConnections: Integer = configuration.getMaximumConnections
+
+  def toConfiguration: PNConfiguration = {
+    val config = new PNConfiguration()
+    config.setOrigin(origin)
+    config.setSubscribeTimeout(subscribeTimeout)
+    config.setSecure(secure)
+    config.setSubscribeKey(subscribeKey)
+    config.setPublishKey(publishKey)
+    config.setCipherKey(cipherKey)
+    config.setAuthKey(authKey)
+    config.setUuid(uuid)
+    config.setConnectTimeout(connectTimeout)
+    config.setFilterExpression(filterExpression)
+    config.setReconnectionPolicy(reconnectionPolicy)
+    config.setMaximumReconnectionRetries(maximumReconnectionRetries)
+    config.setMaximumConnections(maximumConnections)
+    config
+  }
+}
+
+/**
+ * Wrapper class for PNMessageResult with a custom serialization process.
+ * PubNub message model uses GSON objects which are not serializable.
+ */
+class SparkPubNubMessage extends Externalizable {
+  var message: PNMessageResult = _
+
+  // PubNub does not support sending empty messages.
+  def getPayload: String = message.getMessage.toString
+  def getChannel: String = message.getChannel
+  def getPublisher: String = message.getPublisher
+  def getSubscription: String = message.getSubscription
+  // Convert to Unix timestamp.
+  def getTimestamp: Long = Math.ceil(message.getTimetoken / 10000).longValue()
+
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+    def writeVariableLength(data: Any): Unit = {
+      data match {
+        case null => out.writeInt(-1)
+        case d =>
+          val buffer = Utils.serialize(d)
+          out.writeInt(buffer.length)
+          out.write(buffer)
+      }
+    }
+
+    writeVariableLength(
+      if (message.getMessage != null) message.getMessage.toString else null
+    )
+    writeVariableLength(message.getChannel)
+    writeVariableLength(message.getPublisher)
+    writeVariableLength(message.getSubscription)
+
+    out.writeLong(message.getTimetoken)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
+    def readVariableLength(): Any = {
+      in.readInt match {
+        case -1 => null
+        case length =>
+          val buffer = new Array[Byte](length)
+          in.readFully(buffer)
+          Utils.deserialize(buffer)
+      }
+    }
+
+    val parser = new JsonParser
+    val builder = PNMessageResult.builder
+
+    readVariableLength() match {
+      case null =>
+      case data => builder.message(parser.parse(data.asInstanceOf[String]))
+    }
+
+    builder.channel(readVariableLength().asInstanceOf[String])
+    builder.publisher(readVariableLength().asInstanceOf[String])
+    builder.subscription(readVariableLength().asInstanceOf[String])
+    builder.timetoken(in.readLong())
+
+    message = builder.build()
+  }
+}
+
+private[pubnub]
+class PubNubReceiver(configuration: SparkPubNubNConfiguration,
+    channels: Seq[String],
+    channelGroups: Seq[String],
+    timeToken: Option[Long],
+    storageLevel: StorageLevel)
+  extends Receiver[SparkPubNubMessage](storageLevel) with Logging {
+
+  var client: PubNub = _
+
+  override def onStart(): Unit = {
+    client = new PubNub(configuration.toConfiguration)
+    client.addListener(
+      new SubscribeCallback() {
+        def status(pubNub: PubNub, status: PNStatus): Unit = {
+          if (status.isError) {
+            log.error(s"Encountered PubNub error: $status.")
+          }
+        }
+
+        def message(pubNub: PubNub, message: PNMessageResult): Unit = {
+          val record = new SparkPubNubMessage
+          record.message = message
+          store(record)
+        }
+
+        def presence(pubNub: PubNub, presence: PNPresenceEventResult): Unit = {
+        }
+      }
+    )
+    val builder = client.subscribe()
+      .channels(channels.toList.asJava)
+      .channelGroups(channelGroups.toList.asJava)
+    if (timeToken.isDefined) {
+      builder.withTimetoken(timeToken.get)
+    }
+    builder.execute()
+  }
+
+  override def onStop(): Unit = {
+    client.unsubscribeAll()
+    client.destroy()
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala
new file mode 100644
index 0000000..4aa47eb
--- /dev/null
+++ b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.util.{Set => JSet}
+
+import collection.JavaConverters._
+import com.pubnub.api.PNConfiguration
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object PubNubUtils {
+  /**
+   * Create an input stream that returns messages received from PubNub infrastructure.
+   * @param ssc Streaming context
+   * @param configuration PubNub client configuration
+   * @param channels Sequence of channels to subscribe
+   * @param channelGroups Sequence of channel groups to subscribe
+   * @param timeToken Optional point in time to start receiving messages from.
+   *                  Leave undefined to get only latest messages.
+   * @param storageLevel Storage level to use for storing the received objects
+   * @return Input stream
+   */
+  def createStream(
+      ssc: StreamingContext,
+      configuration: PNConfiguration,
+      channels: Seq[String],
+      channelGroups: Seq[String],
+      timeToken: Option[Long] = None,
+      storageLevel: StorageLevel): ReceiverInputDStream[SparkPubNubMessage] = {
+    ssc.withNamedScope("PubNub Stream") {
+      new PubNubInputDStream(
+        ssc, configuration, channels, channelGroups, timeToken, storageLevel
+      )
+    }
+  }
+
+  /**
+   * Create an input stream that returns messages received from PubNub infrastructure.
+   * @param jssc Java streaming context
+   * @param configuration PubNub client configuration
+   * @param channels Set of channels to subscribe
+   * @param channelGroups Set of channel groups to subscribe
+   * @param timeToken Optional point in time to start receiving messages from.
+   *                  Specify <code>null</code> to get only latest messages.
+   * @param storageLevel Storage level to use for storing the received objects
+   * @return Input stream
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      configuration: PNConfiguration,
+      channels: JSet[String],
+      channelGroups: JSet[String],
+      timeToken: Option[Long],
+      storageLevel: StorageLevel): JavaReceiverInputDStream[SparkPubNubMessage] = {
+    createStream(
+      jssc.ssc, configuration, Seq.empty ++ channels.asScala,
+      Seq.empty ++ channelGroups.asScala, timeToken, storageLevel
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..448fb5e
--- /dev/null
+++ b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java
new file mode 100644
index 0000000..507b992
--- /dev/null
+++ b/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub;
+
+import com.pubnub.api.PNConfiguration;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+import java.util.HashSet;
+
+public class JavaPubNubStreamSuite extends LocalJavaStreamingContext {
+    @Test
+    public void testPubNubStream() {
+        // Tests the API compatibility, but do not actually receive any data.
+        JavaReceiverInputDStream<SparkPubNubMessage> stream = PubNubUtils.createStream(
+                ssc, new PNConfiguration(), new HashSet<>(), new HashSet<>(), null,
+                StorageLevel.MEMORY_AND_DISK_SER_2()
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/resources/log4j.properties b/streaming-pubnub/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bcb37d2
--- /dev/null
+++ b/streaming-pubnub/src/test/resources/log4j.properties
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=INFO, console, file
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala
new file mode 100644
index 0000000..9d1c3e9
--- /dev/null
+++ b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+
+import com.google.gson.JsonParser
+import com.pubnub.api.models.consumer.pubsub.PNMessageResult
+
+import org.apache.spark.SparkFunSuite
+
+class MessageSerializationSuite extends SparkFunSuite {
+  test("Full example") {
+    checkMessageSerialization(
+      "{\"message\":\"Hello, World!\"}", "channel1",
+      "publisher1", "subscription1", System.currentTimeMillis * 10000
+    )
+  }
+
+  test("Message from channel") {
+    checkMessageSerialization("{\"message\":\"Hello, World!\"}", "c", "p", null, 13534398158620385L)
+  }
+
+  test("Message from subscription") {
+    checkMessageSerialization("{\"message\":\"Hello, World!\"}", null, "p", "s", 13534397812467596L)
+  }
+
+  def checkMessageSerialization(payload: String, channel: String,
+      publisher: String, subscription: String, timestamp: Long): Unit = {
+    val builder = PNMessageResult.builder
+      .message(if (payload != null) new JsonParser().parse(payload) else null)
+      .channel(channel)
+      .publisher(publisher)
+      .subscription(subscription)
+      .timetoken(timestamp)
+    val pubNubMessage = builder.build()
+    val sparkMessage = new SparkPubNubMessage
+    sparkMessage.message = pubNubMessage
+
+    // serializer
+    val byteOutStream = new ByteArrayOutputStream
+    val outputStream = new ObjectOutputStream(byteOutStream)
+    outputStream.writeObject(sparkMessage)
+    outputStream.flush()
+    outputStream.close()
+    byteOutStream.close()
+    val serializedBytes = byteOutStream.toByteArray
+
+    // deserialize
+    val byteInStream = new ByteArrayInputStream(serializedBytes)
+    val inputStream = new ObjectInputStream(byteInStream)
+    val deserializedMessage = inputStream.readObject().asInstanceOf[SparkPubNubMessage]
+    inputStream.close()
+    byteInStream.close()
+
+    assert(payload.equals(deserializedMessage.getPayload))
+    if (channel != null) {
+      assert(channel.equals(deserializedMessage.getChannel))
+    } else {
+      assert(deserializedMessage.getChannel == null)
+    }
+    if (subscription != null) {
+      assert(subscription.equals(deserializedMessage.getSubscription))
+    } else {
+      assert(deserializedMessage.getSubscription == null)
+    }
+    assert(publisher.equals(deserializedMessage.getPublisher))
+    val unixTimestamp = Math.ceil(timestamp / 10000).longValue()
+    assert(unixTimestamp.equals(deserializedMessage.getTimestamp))
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala
new file mode 100644
index 0000000..aa461db
--- /dev/null
+++ b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubnub
+
+import java.util.{Map => JMap}
+import java.util.UUID
+
+import collection.JavaConverters._
+import com.google.gson.JsonObject
+import com.pubnub.api.{PNConfiguration, PubNub, PubNubException}
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time
+import org.scalatest.time.Span
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+
+class PubNubStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter {
+  val subscribeKey = "demo"
+  val publishKey = "demo"
+  val channel = "test"
+
+  var ssc: StreamingContext = _
+  var configuration: PNConfiguration = _
+  var client: PubNub = _
+
+  override def beforeAll(): Unit = {
+    configuration = new PNConfiguration()
+    configuration.setSubscribeKey(subscribeKey)
+    configuration.setPublishKey(publishKey)
+    client = new PubNub(configuration)
+  }
+
+  override def afterAll(): Unit = {
+    client.destroy()
+  }
+
+  before {
+    ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, Seconds(1))
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+    }
+  }
+
+  test("Stream receives messages") {
+    val nbOfMsg = 5
+    var publishedMessages: List[JsonObject] = List()
+    @volatile var receivedMessages: Set[SparkPubNubMessage] = Set()
+
+    val receiveStream = PubNubUtils.createStream(
+      ssc, configuration, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2
+    )
+    receiveStream.foreachRDD { rdd =>
+      if (rdd.collect().length > 0) {
+        receivedMessages = receivedMessages ++ List(rdd.first)
+        receivedMessages
+      }
+    }
+    ssc.start()
+
+    (1 to nbOfMsg).foreach(
+      _ => {
+        val message = new JsonObject()
+        message.addProperty("text", UUID.randomUUID().toString)
+        publishedMessages = message :: publishedMessages
+      }
+    )
+
+    eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+      publishedMessages.foreach(
+        m => if (!receivedMessages.map(m => m.getPayload).contains(m.toString)) publishMessage(m)
+      )
+      assert(
+        publishedMessages.map(m => m.toString).toSet
+          .subsetOf(receivedMessages.map(m => m.getPayload))
+      )
+      assert(channel.equals(receivedMessages.head.getChannel))
+    }
+  }
+
+  test("Message filtering") {
+    val config = new PNConfiguration()
+    config.setSubscribeKey(subscribeKey)
+    config.setPublishKey(publishKey)
+    config.setFilterExpression("language == 'english'")
+
+    @volatile var receivedMessages: Set[SparkPubNubMessage] = Set()
+
+    val receiveStream = PubNubUtils.createStream(
+      ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2
+    )
+    receiveStream.foreachRDD { rdd =>
+      if (rdd.collect().length > 0) {
+        receivedMessages = receivedMessages ++ List(rdd.first)
+        receivedMessages
+      }
+    }
+    ssc.start()
+
+    eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+      val polishMessage = new JsonObject()
+      polishMessage.addProperty("text", "dzien dobry")
+      publishMessage(polishMessage, Map("language" -> "polish").asJava)
+
+      val englishMessage = new JsonObject()
+      englishMessage.addProperty("text", "good morning")
+      publishMessage(englishMessage, Map("language" -> "english").asJava)
+
+      assert(receivedMessages.map(m => m.getPayload).size == 1)
+      assert(receivedMessages.head.getPayload.equals(englishMessage.toString))
+    }
+  }
+
+  test("Test time token") {
+    val config = new PNConfiguration()
+    config.setSubscribeKey(subscribeKey)
+    config.setPublishKey(publishKey)
+
+    @volatile var receivedMessages: Set[SparkPubNubMessage] = Set()
+
+    val currentTimeToken = client.time().sync().getTimetoken
+
+    // Try to register subscriber with time token after we send first message.
+    val receiveStream = PubNubUtils.createStream(
+      ssc, config, Seq(channel), Seq(), Some(currentTimeToken + 5000*10000),
+      StorageLevel.MEMORY_AND_DISK_SER_2
+    )
+    receiveStream.foreachRDD { rdd =>
+      if (rdd.collect().length > 0) {
+        receivedMessages = receivedMessages ++ List(rdd.first)
+        receivedMessages
+      }
+    }
+    ssc.start()
+
+    // Give time for subscription to successfully register.
+    Thread.sleep(1000)
+
+    // Make sure we publish the message. Hopefully it will not take more than 5 seconds.
+    // Otherwise we may see the message and test will fails.
+    val message = new JsonObject()
+    message.addProperty("text", "past")
+    var timeToken = -1L
+    while (timeToken == -1L) {
+      timeToken = publishMessage(message = message, store = true)
+      Thread.sleep(500)
+    }
+
+    eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+      val message = new JsonObject()
+      message.addProperty("text", "future")
+      publishMessage(message)
+
+      assert(receivedMessages.map(m => m.getPayload).size == 1)
+      assert(receivedMessages.head.getPayload.equals(message.toString))
+    }
+  }
+
+  def publishMessage(message: JsonObject,
+      metadata: JMap[String, String] = Map.empty[String, String].asJava,
+      store: Boolean = false) : Long = {
+    try {
+      client.publish().channel(channel).meta(metadata)
+        .message(message).shouldStore(store).sync().getTimetoken
+    } catch {
+      case e: PubNubException =>
+        if (!e.getErrormsg.contains("Account quota exceeded (2/1000000)")) {
+          // Ignore quota limits on demo account. We will retry.
+          throw new RuntimeException(e)
+        }
+        -1
+    }
+  }
+}