You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:25 UTC

[39/50] [abbrv] incubator-livy-website git commit: [BAHIR-116] Add spark streaming connector to Google Cloud Pub/Sub

[BAHIR-116] Add spark streaming connector to Google Cloud Pub/Sub

Cloases #42.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/commit/56613263
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/tree/56613263
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/diff/56613263

Branch: refs/heads/master
Commit: 56613263ca405aa5b45f32565ad4641f0a7b9752
Parents: 2a43076
Author: Chen Bin <bc...@talend.com>
Authored: Thu Apr 27 17:18:32 2017 +0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Tue Jun 6 21:03:49 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   1 +
 streaming-pubsub/README.md                      |  45 +++
 .../PubsubWordCount.scala                       | 159 +++++++++++
 streaming-pubsub/pom.xml                        |  86 ++++++
 .../streaming/pubsub/PubsubInputDStream.scala   | 286 +++++++++++++++++++
 .../spark/streaming/pubsub/PubsubUtils.scala    | 105 +++++++
 .../streaming/pubsub/SparkGCPCredentials.scala  | 166 +++++++++++
 .../streaming/LocalJavaStreamingContext.java    |  44 +++
 .../streaming/pubsub/JavaPubsubStreamSuite.java |  38 +++
 .../src/test/resources/log4j.properties         |  28 ++
 .../spark/streaming/pubsub/PubsubFunSuite.scala |  46 +++
 .../streaming/pubsub/PubsubStreamSuite.scala    | 138 +++++++++
 .../streaming/pubsub/PubsubTestUtils.scala      | 142 +++++++++
 .../SparkGCPCredentialsBuilderSuite.scala       |  95 ++++++
 14 files changed, 1379 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f76aac5..81f2e28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
     <module>sql-streaming-mqtt</module>
     <module>streaming-twitter</module>
     <module>streaming-zeromq</module>
+    <module>streaming-pubsub</module>
   </modules>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/README.md
----------------------------------------------------------------------
diff --git a/streaming-pubsub/README.md b/streaming-pubsub/README.md
new file mode 100644
index 0000000..faf9826
--- /dev/null
+++ b/streaming-pubsub/README.md
@@ -0,0 +1,45 @@
+A library for reading data from [Google Cloud Pub/Sub](https://cloud.google.com/pubsub/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+    
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.2.0-SNAPSHOT"
+    
+Using Maven:
+    
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-pubsub_2.11</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+First you need to create credential by SparkGCPCredentials, it support four type of credentials
+* application default
+    `SparkGCPCredentials.builder.build()`
+* json type service account
+    `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()`
+* p12 type service account
+    `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()`
+* metadata service account(running on dataproc)
+    `SparkGCPCredentials.builder.metadataServiceAccount().build()`
+
+### Scala API
+    
+    val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, credential, ..)
+    
+### Java API
+    
+    JavaDStream<SparkPubsubMessage> lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...) 
+
+See end-to-end examples at [Google Cloud Pubsub Examples](streaming-pubsub/examples)

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala b/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala
new file mode 100644
index 0000000..00f1fa1
--- /dev/null
+++ b/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming.pubsub
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
+import com.google.api.client.json.jackson2.JacksonFactory
+import com.google.api.services.pubsub.Pubsub.Builder
+import com.google.api.services.pubsub.model.PublishRequest
+import com.google.api.services.pubsub.model.PubsubMessage
+import com.google.cloud.hadoop.util.RetryHttpInitializer
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.pubsub.ConnectionUtils
+import org.apache.spark.streaming.pubsub.PubsubTestUtils
+import org.apache.spark.streaming.pubsub.PubsubUtils
+import org.apache.spark.streaming.pubsub.SparkGCPCredentials
+import org.apache.spark.streaming.pubsub.SparkPubsubMessage
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.SparkConf
+
+
+/**
+ * Consumes messages from a Google Cloud Pub/Sub subscription and does wordcount.
+ * In this example it use application default credentials, so need to use gcloud
+ * client to generate token file before running example
+ *
+ * Usage: PubsubWordCount <projectId> <subscription>
+ *   <projectId> is the name of Google cloud
+ *   <subscription> is the subscription to a topic
+ *
+ * Example:
+ *  # use gcloud client generate token file
+ *  $ gcloud init
+ *  $ gcloud auth application-default login
+ *
+ *  # run the example
+ *  $ bin/run-example \
+ *      org.apache.spark.examples.streaming.pubsub.PubsubWordCount project_1 subscription_1
+ *
+ */
+object PubsubWordCount {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 2) {
+      System.err.println(
+        """
+          |Usage: PubsubWordCount <projectId> <subscription>
+          |
+          |     <projectId> is the name of Google cloud
+          |     <subscription> is the subscription to a topic
+          |
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    val Seq(projectId, subscription) = args.toSeq
+
+    val sparkConf = new SparkConf().setAppName("PubsubWordCount")
+    val ssc = new StreamingContext(sparkConf, Milliseconds(2000))
+
+    val pubsubStream: ReceiverInputDStream[SparkPubsubMessage] = PubsubUtils.createStream(
+      ssc, projectId, None, subscription,
+      SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
+
+    val wordCounts =
+      pubsubStream.map(message => (new String(message.getData()), 1)).reduceByKey(_ + _)
+
+    wordCounts.print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+
+}
+
+/**
+ * A Pub/Sub publisher for demonstration purposes, publishes message in 10 batches(seconds),
+ * you can set the size of messages in each batch by <records-per-sec>,
+ * and each message will contains only one word in this list
+ * ("google", "cloud", "pubsub", "say", "hello")
+ *
+ * Usage: PubsubPublisher <projectId> <topic> <records-per-sec>
+ *
+ *   <stream-projectIdname> is the name of Google cloud
+ *   <topic> is the topic of Google cloud Pub/Sub
+ *   <records-per-sec> is the rate of records per second to put onto the stream
+ *
+ * Example:
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.pubsub.PubsubPublisher project_1 topic_1 10`
+ */
+object PubsubPublisher {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 3) {
+      System.err.println(
+        """
+          |Usage: PubsubPublisher <projectId> <topic> <records-per-sec>
+          |
+          |     <projectId> is the name of Google cloud
+          |     <topic> is the topic of Google cloud Pub/Sub
+          |     <records-per-sec> is the rate of records per second to put onto the topic
+          |
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    val Seq(projectId, topic, recordsPerSecond) = args.toSeq
+
+    val APP_NAME = this.getClass.getSimpleName
+
+    val client = new Builder(
+      GoogleNetHttpTransport.newTrustedTransport(),
+      JacksonFactory.getDefaultInstance(),
+      new RetryHttpInitializer(
+        SparkGCPCredentials.builder.build().provider,
+        APP_NAME
+      ))
+        .setApplicationName(APP_NAME)
+        .build()
+
+    val randomWords = List("google", "cloud", "pubsub", "say", "hello")
+    val publishRequest = new PublishRequest()
+    for (i <- 1 to 10) {
+      val messages = (1 to recordsPerSecond.toInt).map { recordNum =>
+          val randomWordIndex = Random.nextInt(randomWords.size)
+          new PubsubMessage().encodeData(randomWords(randomWordIndex).getBytes())
+      }
+      publishRequest.setMessages(messages.asJava)
+      client.projects().topics()
+          .publish(s"projects/$projectId/topics/$topic", publishRequest)
+          .execute()
+      println(s"Published data. topic: $topic; Mesaage: $publishRequest")
+
+      Thread.sleep(1000)
+    }
+
+  }
+}
+// scalastyle:on

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml
new file mode 100644
index 0000000..c3da90f
--- /dev/null
+++ b/streaming-pubsub/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>bahir-parent_2.11</artifactId>
+    <groupId>org.apache.bahir</groupId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>spark-streaming-pubsub_2.11</artifactId>
+  <properties>
+    <sbt.project.name>streaming-pubsub</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Apache Bahir - Spark Streaming Google PubSub</name>
+  <url>http://bahir.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-pubsub</artifactId>
+      <version>v1-rev355-1.22.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util</artifactId>
+      <version>1.6.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util-hadoop</artifactId>
+      <version>1.6.0-hadoop2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
new file mode 100644
index 0000000..e769f2e
--- /dev/null
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.google.api.client.auth.oauth2.Credential
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
+import com.google.api.client.googleapis.json.GoogleJsonResponseException
+import com.google.api.client.json.jackson2.JacksonFactory
+import com.google.api.services.pubsub.Pubsub.Builder
+import com.google.api.services.pubsub.model.{AcknowledgeRequest, PubsubMessage, PullRequest}
+import com.google.api.services.pubsub.model.Subscription
+import com.google.cloud.hadoop.util.RetryHttpInitializer
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
+
+/**
+ * Input stream that subscribe messages from Google cloud Pub/Sub subscription.
+ * @param project         Google cloud project id
+ * @param topic           Topic name for creating subscription if need
+ * @param subscription    Pub/Sub subscription name
+ * @param credential      Google cloud project credential to access Pub/Sub service
+ */
+private[streaming]
+class PubsubInputDStream(
+    _ssc: StreamingContext,
+    val project: String,
+    val topic: Option[String],
+    val subscription: String,
+    val credential: SparkGCPCredentials,
+    val _storageLevel: StorageLevel
+) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) {
+
+  override def getReceiver(): Receiver[SparkPubsubMessage] = {
+    new PubsubReceiver(project, topic, subscription, credential, _storageLevel)
+  }
+}
+
+/**
+ * A wrapper class for PubsubMessage's with a custom serialization format.
+ *
+ * This is necessary because PubsubMessage uses inner data structures
+ * which are not serializable.
+ */
+class SparkPubsubMessage() extends Externalizable {
+
+  private[pubsub] var message = new PubsubMessage
+
+  def getData(): Array[Byte] = message.decodeData()
+
+  def getAttributes(): java.util.Map[String, String] = message.getAttributes
+
+  def getMessageId(): String = message.getMessageId
+
+  def getPublishTime(): String = message.getPublishTime
+
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+    message.decodeData() match {
+      case null => out.writeInt(-1)
+      case data =>
+        out.writeInt(data.size)
+        out.write(data)
+    }
+
+    message.getMessageId match {
+      case null => out.writeInt(-1)
+      case id =>
+        val idBuff = Utils.serialize(id)
+        out.writeInt(idBuff.length)
+        out.write(idBuff)
+    }
+
+    message.getPublishTime match {
+      case null => out.writeInt(-1)
+      case time =>
+        val publishTimeBuff = Utils.serialize(time)
+        out.writeInt(publishTimeBuff.length)
+        out.write(publishTimeBuff)
+    }
+
+    message.getAttributes match {
+      case null => out.writeInt(-1)
+      case attrs =>
+        out.writeInt(attrs.size())
+        for ((k, v) <- message.getAttributes.asScala) {
+          val keyBuff = Utils.serialize(k)
+          out.writeInt(keyBuff.length)
+          out.write(keyBuff)
+          val valBuff = Utils.serialize(v)
+          out.writeInt(valBuff.length)
+          out.write(valBuff)
+        }
+    }
+  }
+
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
+    in.readInt() match {
+      case -1 => message.encodeData(null)
+      case bodyLength =>
+        val data = new Array[Byte](bodyLength)
+        in.readFully(data)
+        message.encodeData(data)
+    }
+
+    in.readInt() match {
+      case -1 => message.setMessageId(null)
+      case idLength =>
+        val idBuff = new Array[Byte](idLength)
+        in.readFully(idBuff)
+        val id: String = Utils.deserialize(idBuff)
+        message.setMessageId(id)
+    }
+
+    in.readInt() match {
+      case -1 => message.setPublishTime(null)
+      case publishTimeLength =>
+        val publishTimeBuff = new Array[Byte](publishTimeLength)
+        in.readFully(publishTimeBuff)
+        val publishTime: String = Utils.deserialize(publishTimeBuff)
+        message.setPublishTime(publishTime)
+    }
+
+    in.readInt() match {
+      case -1 => message.setAttributes(null)
+      case numAttributes =>
+        val attributes = new java.util.HashMap[String, String]
+        for (i <- 0 until numAttributes) {
+          val keyLength = in.readInt()
+          val keyBuff = new Array[Byte](keyLength)
+          in.readFully(keyBuff)
+          val key: String = Utils.deserialize(keyBuff)
+
+          val valLength = in.readInt()
+          val valBuff = new Array[Byte](valLength)
+          in.readFully(valBuff)
+          val value: String = Utils.deserialize(valBuff)
+
+          attributes.put(key, value)
+        }
+        message.setAttributes(attributes)
+    }
+  }
+}
+
+private [pubsub]
+object ConnectionUtils {
+  val transport = GoogleNetHttpTransport.newTrustedTransport();
+  val jacksonFactory = JacksonFactory.getDefaultInstance;
+
+  // The topic or subscription already exists.
+  // This is an error on creation operations.
+  val ALREADY_EXISTS = 409
+
+  /**
+   * Client can retry with these response status
+   */
+  val RESOURCE_EXHAUSTED = 429
+
+  val CANCELLED = 499
+
+  val INTERNAL = 500
+
+  val UNAVAILABLE = 503
+
+  val DEADLINE_EXCEEDED = 504
+
+  def retryable(status: Int): Boolean = {
+    status match {
+      case RESOURCE_EXHAUSTED | CANCELLED | INTERNAL | UNAVAILABLE | DEADLINE_EXCEEDED => true
+      case _ => false
+    }
+  }
+}
+
+
+private[pubsub]
+class PubsubReceiver(
+    project: String,
+    topic: Option[String],
+    subscription: String,
+    credential: SparkGCPCredentials,
+    storageLevel: StorageLevel)
+    extends Receiver[SparkPubsubMessage](storageLevel) {
+
+  val APP_NAME = "sparkstreaming-pubsub-receiver"
+
+  val INIT_BACKOFF = 100 // 100ms
+
+  val MAX_BACKOFF = 10 * 1000 // 10s
+
+  val MAX_MESSAGE = 1000
+
+  lazy val client = new Builder(
+    ConnectionUtils.transport,
+    ConnectionUtils.jacksonFactory,
+    new RetryHttpInitializer(credential.provider, APP_NAME))
+      .setApplicationName(APP_NAME)
+      .build()
+
+  val projectFullName: String = s"projects/$project"
+  val subscriptionFullName: String = s"$projectFullName/subscriptions/$subscription"
+
+  override def onStart(): Unit = {
+    topic match {
+      case Some(t) =>
+        val sub: Subscription = new Subscription
+        sub.setTopic(s"$projectFullName/topics/$t")
+        try {
+          client.projects().subscriptions().create(subscriptionFullName, sub).execute()
+        } catch {
+          case e: GoogleJsonResponseException =>
+            if (e.getDetails.getCode == ConnectionUtils.ALREADY_EXISTS) {
+              // Ignore subscription already exists exception.
+            } else {
+              reportError("Failed to create subscription", e)
+            }
+          case NonFatal(e) =>
+            reportError("Failed to create subscription", e)
+        }
+      case None => // do nothing
+    }
+    new Thread() {
+      override def run() {
+        receive()
+      }
+    }.start()
+  }
+
+  def receive(): Unit = {
+    val pullRequest = new PullRequest().setMaxMessages(MAX_MESSAGE).setReturnImmediately(false)
+    var backoff = INIT_BACKOFF
+    while (!isStopped()) {
+      try {
+        val pullResponse =
+          client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute()
+        val receivedMessages = pullResponse.getReceivedMessages.asScala.toList
+        store(receivedMessages
+            .map(x => {
+              val sm = new SparkPubsubMessage
+              sm.message = x.getMessage
+              sm
+            })
+            .iterator)
+
+        val ackRequest = new AcknowledgeRequest()
+        ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
+        client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute()
+        backoff = INIT_BACKOFF
+      } catch {
+        case e: GoogleJsonResponseException =>
+          if (ConnectionUtils.retryable(e.getDetails.getCode)) {
+            Thread.sleep(backoff)
+            backoff = Math.min(backoff * 2, MAX_BACKOFF)
+          } else {
+            reportError("Failed to pull messages", e)
+          }
+        case NonFatal(e) => reportError("Failed to pull messages", e)
+      }
+    }
+  }
+
+  override def onStop(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
new file mode 100644
index 0000000..b4f02b9
--- /dev/null
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object PubsubUtils {
+
+  /**
+   * Create an input stream that receives messages pushed by a Pub/Sub publisher
+   * using service account authentication
+   *
+   * If topic is given, and the subscription doesn't exist,
+   * create subscription by the given name.
+   * Note: This Receiver will only receive the message arrived after the subscription created.
+   * If topic is not given, throw not found exception when it doesn't exist
+   *
+   * @param ssc             StreamingContext object
+   * @param project         Google cloud project id
+   * @param topic           Topic name for creating subscription if need
+   * @param subscription    Subscription name to subscribe to
+   * @param credentials     SparkGCPCredentials to use for authenticating
+   * @param storageLevel    RDD storage level
+   * @return
+   */
+  def createStream(
+      ssc: StreamingContext,
+      project: String,
+      topic: Option[String],
+      subscription: String,
+      credentials: SparkGCPCredentials,
+      storageLevel: StorageLevel): ReceiverInputDStream[SparkPubsubMessage] = {
+    ssc.withNamedScope("pubsub stream") {
+
+      new PubsubInputDStream(
+        ssc,
+        project,
+        topic,
+        subscription,
+        credentials,
+        storageLevel)
+    }
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a Pub/Sub publisher
+   * using given credential
+   *
+   * Throw not found exception if the subscription doesn't exist
+   *
+   * @param jssc         JavaStreamingContext object
+   * @param project      Google cloud project id
+   * @param subscription Subscription name to subscribe to
+   * @param credentials  SparkGCPCredentials to use for authenticating
+   * @param storageLevel RDD storage level
+   * @return
+   */
+  def createStream(jssc: JavaStreamingContext, project: String, subscription: String,
+      credentials: SparkGCPCredentials, storageLevel: StorageLevel
+      ): JavaReceiverInputDStream[SparkPubsubMessage] = {
+    createStream(jssc.ssc, project, None, subscription, credentials, storageLevel)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a Pub/Sub publisher
+   * using given credential
+   *
+   * If the subscription doesn't exist, create subscription by the given name.
+   * Note: This Receiver will only receive the message arrived after the subscription created.
+   *
+   * @param jssc            JavaStreamingContext object
+   * @param project         Google cloud project id
+   * @param topic           Topic name for creating subscription if need
+   * @param subscription    Subscription name to subscribe to
+   * @param credentials     SparkGCPCredentials to use for authenticating
+   * @param storageLevel    RDD storage level
+   * @return
+   */
+  def createStream(jssc: JavaStreamingContext,
+      project: String, topic: String, subscription: String,
+      credentials: SparkGCPCredentials, storageLevel: StorageLevel
+  ): JavaReceiverInputDStream[SparkPubsubMessage] = {
+    createStream(jssc.ssc, project, Some(topic), subscription, credentials, storageLevel)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
new file mode 100644
index 0000000..5cadde3
--- /dev/null
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import com.google.api.client.auth.oauth2.Credential
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
+import com.google.api.services.pubsub.PubsubScopes
+import com.google.cloud.hadoop.util.{EntriesCredentialConfiguration, HadoopCredentialConfiguration}
+import java.util
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * Serializable interface providing a method executors can call to obtain an
+ * GCPCredentialsProvider instance for authenticating to GCP services.
+ */
+private[pubsub] sealed trait SparkGCPCredentials extends Serializable {
+
+  def provider: Credential
+}
+
+/**
+ * Returns application default type credential
+ */
+private[pubsub] final case object ApplicationDefaultCredentials extends SparkGCPCredentials {
+
+  override def provider: Credential = {
+    GoogleCredential.getApplicationDefault.createScoped(PubsubScopes.all())
+  }
+}
+
+/**
+ * Returns a Service Account type Credential instance.
+ * If all parameters are None, then try metadata service type
+ * If jsonFilePath available, try json type
+ * If jsonFilePath is None and p12FilePath and emailAccount available, try p12 type
+ *
+ * @param jsonFilePath file path for json
+ * @param p12FilePath  file path for p12
+ * @param emailAccount email account for p12
+ */
+private[pubsub] final case class ServiceAccountCredentials(
+    jsonFilePath: Option[String] = None,
+    p12FilePath: Option[String] = None,
+    emailAccount: Option[String] = None)
+    extends SparkGCPCredentials {
+
+  override def provider: Credential = {
+    val conf = new Configuration(false)
+    conf.setBoolean(
+      EntriesCredentialConfiguration.BASE_KEY_PREFIX
+          + EntriesCredentialConfiguration.ENABLE_SERVICE_ACCOUNTS_SUFFIX,
+      true)
+    jsonFilePath match {
+      case Some(jsonFilePath) =>
+        conf.set(
+          EntriesCredentialConfiguration.BASE_KEY_PREFIX
+              + EntriesCredentialConfiguration.JSON_KEYFILE_SUFFIX,
+          jsonFilePath
+        )
+      case _ => // do nothing
+    }
+    p12FilePath match {
+      case Some(p12FilePath) =>
+        conf.set(
+          EntriesCredentialConfiguration.BASE_KEY_PREFIX
+              + EntriesCredentialConfiguration.SERVICE_ACCOUNT_KEYFILE_SUFFIX,
+          p12FilePath
+        )
+      case _ => // do nothing
+    }
+    emailAccount match {
+      case Some(emailAccount) =>
+        conf.set(
+          EntriesCredentialConfiguration.BASE_KEY_PREFIX
+              + EntriesCredentialConfiguration.SERVICE_ACCOUNT_EMAIL_SUFFIX,
+          emailAccount
+        )
+      case _ => // do nothing
+    }
+
+    HadoopCredentialConfiguration
+        .newBuilder()
+        .withConfiguration(conf)
+        .build()
+        .getCredential(new util.ArrayList(PubsubScopes.all()))
+  }
+
+}
+
+object SparkGCPCredentials {
+
+  /**
+   * Builder for SparkGCPCredentials instance.
+   */
+  class Builder {
+    private var creds: Option[SparkGCPCredentials] = None
+
+    /**
+     * Use a json type key file for service account credential
+     *
+     * @param jsonFilePath json type key file
+     * @return Reference to this SparkGCPCredentials.Builder
+     */
+    def jsonServiceAccount(jsonFilePath: String): Builder = {
+      creds = Option(ServiceAccountCredentials(Option(jsonFilePath)))
+      this
+    }
+
+    /**
+     * Use a p12 type key file service account credential
+     *
+     * @param p12FilePath p12 type key file
+     * @param emailAccount email of service account
+     * @return Reference to this SparkGCPCredentials.Builder
+     */
+    def p12ServiceAccount(p12FilePath: String, emailAccount: String): Builder = {
+      creds = Option(ServiceAccountCredentials(
+        p12FilePath = Option(p12FilePath), emailAccount = Option(emailAccount)))
+      this
+    }
+
+    /**
+     * Use a meta data service to return service account
+     * @return Reference to this SparkGCPCredentials.Builder
+     */
+    def metadataServiceAccount(): Builder = {
+      creds = Option(ServiceAccountCredentials())
+      this
+    }
+
+    /**
+     * Returns the appropriate instance of SparkGCPCredentials given the configured
+     * parameters.
+     *
+     * - The service account credentials will be returned if they were provided.
+     *
+     * - The application default credentials will be returned otherwise.
+     * @return
+     */
+    def build(): SparkGCPCredentials = creds.getOrElse(ApplicationDefaultCredentials)
+
+  }
+
+  /**
+   * Creates a SparkGCPCredentials.Builder for constructing
+   * SparkGCPCredentials instance.
+   *
+   * @return SparkGCPCredentials.Builder instance
+   */
+  def builder: Builder = new Builder
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..cfedb5a
--- /dev/null
+++ b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java
new file mode 100644
index 0000000..360b9a9
--- /dev/null
+++ b/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaPubsubStreamSuite extends LocalJavaStreamingContext {
+    @Test
+    public void testPubsubStream() {
+        // tests the API, does not actually test data receiving
+        JavaReceiverInputDStream<SparkPubsubMessage> stream1 = PubsubUtils.createStream(
+                ssc, "project", "subscription",
+                new SparkGCPCredentials.Builder().build(), StorageLevel.MEMORY_AND_DISK_SER_2());
+
+        JavaReceiverInputDStream<SparkPubsubMessage> stream2 = PubsubUtils.createStream(
+                ssc, "project", "topic", "subscription",
+                new SparkGCPCredentials.Builder().build(), StorageLevel.MEMORY_AND_DISK_SER_2());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/resources/log4j.properties b/streaming-pubsub/src/test/resources/log4j.properties
new file mode 100644
index 0000000..75e3b53
--- /dev/null
+++ b/streaming-pubsub/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
new file mode 100644
index 0000000..acdceb7
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Helper class that runs Google Cloud Pub/Sub real data transfer tests of
+ * ignores them based on env variable is set or not.
+ */
+trait PubsubFunSuite extends SparkFunSuite {
+  import PubsubTestUtils._
+
+  /** Run the test if environment variable is set or ignore the test */
+  def testIfEnabled(testName: String)(testBody: => Unit) {
+    if (shouldRunTests) {
+      test(testName)(testBody)
+    } else {
+      ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
+    }
+  }
+
+  /** Run the give body of code only if Kinesis tests are enabled */
+  def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
+    if (shouldRunTests) {
+      body
+    } else {
+      ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
new file mode 100644
index 0000000..284950c
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import java.util.UUID
+
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Seconds
+
+class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAfter {
+
+  val batchDuration = Seconds(1)
+
+  private val master: String = "local[2]"
+
+  private val appName: String = this.getClass.getSimpleName
+
+  private val topicName: String = s"bahirStreamTestTopic_${UUID.randomUUID()}"
+
+  private val subscriptionName: String = s"${topicName}_sub"
+
+  private val subForCreateName: String = s"${topicName}_create_me"
+
+  private var ssc: StreamingContext = null
+  private var pubsubTestUtils: PubsubTestUtils = null
+  private var topicFullName: String = null
+  private var subscriptionFullName: String = null
+  private var subForCreateFullName: String = null
+
+  override def beforeAll(): Unit = {
+    runIfTestsEnabled("Prepare PubsubTestUtils") {
+      pubsubTestUtils = new PubsubTestUtils
+      topicFullName = pubsubTestUtils.getFullTopicPath(topicName)
+      subscriptionFullName = pubsubTestUtils.getFullSubscriptionPath(subscriptionName)
+      subForCreateFullName = pubsubTestUtils.getFullSubscriptionPath(subForCreateName)
+      pubsubTestUtils.createTopic(topicFullName)
+      pubsubTestUtils.createSubscription(topicFullName, subscriptionFullName)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    if (pubsubTestUtils != null) {
+      pubsubTestUtils.removeSubscription(subForCreateFullName)
+      pubsubTestUtils.removeSubscription(subscriptionFullName)
+      pubsubTestUtils.removeTopic(topicFullName)
+    }
+  }
+
+  before {
+    ssc = new StreamingContext(master, appName, batchDuration)
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+    }
+  }
+
+  test("PubsubUtils API") {
+    val pubsubStream1 = PubsubUtils.createStream(
+      ssc, "project", None, "subscription",
+      PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+    val pubsubStream2 = PubsubUtils.createStream(
+      ssc, "project", Some("topic"), "subscription",
+      PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+  }
+
+  testIfEnabled("pubsub input stream") {
+    val receiveStream = PubsubUtils.createStream(
+      ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
+      PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+    @volatile var receiveMessages: List[SparkPubsubMessage] = List()
+    receiveStream.foreachRDD { rdd =>
+      if (rdd.collect().length > 0) {
+        receiveMessages = receiveMessages ::: List(rdd.first)
+        receiveMessages
+      }
+    }
+
+    ssc.start()
+
+    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      val sendMessages = pubsubTestUtils.generatorMessages(10)
+      pubsubTestUtils.publishData(topicFullName, sendMessages)
+      assert(sendMessages.map(m => new String(m.getData))
+          .contains(new String(receiveMessages(0).getData)))
+      assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
+    }
+  }
+
+  testIfEnabled("pubsub input stream, create pubsub") {
+    val receiveStream = PubsubUtils.createStream(
+      ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName,
+      PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+    @volatile var receiveMessages: List[SparkPubsubMessage] = List()
+    receiveStream.foreachRDD { rdd =>
+      if (rdd.collect().length > 0) {
+        receiveMessages = receiveMessages ::: List(rdd.first)
+        receiveMessages
+      }
+    }
+
+    ssc.start()
+
+    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      val sendMessages = pubsubTestUtils.generatorMessages(10)
+      pubsubTestUtils.publishData(topicFullName, sendMessages)
+      assert(sendMessages.map(m => new String(m.getData))
+          .contains(new String(receiveMessages(0).getData)))
+      assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
new file mode 100644
index 0000000..9dd719a
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import scala.collection.JavaConverters._
+
+import com.google.api.services.pubsub.Pubsub
+import com.google.api.services.pubsub.Pubsub.Builder
+import com.google.api.services.pubsub.model.PublishRequest
+import com.google.api.services.pubsub.model.PubsubMessage
+import com.google.api.services.pubsub.model.Subscription
+import com.google.api.services.pubsub.model.Topic
+import com.google.cloud.hadoop.util.RetryHttpInitializer
+
+import org.apache.spark.internal.Logging
+
+private[pubsub] class PubsubTestUtils extends Logging {
+
+  val APP_NAME = this.getClass.getSimpleName
+
+  val client: Pubsub = {
+    new Builder(
+      ConnectionUtils.transport,
+      ConnectionUtils.jacksonFactory,
+      new RetryHttpInitializer(
+        PubsubTestUtils.credential.provider,
+        APP_NAME
+      ))
+        .setApplicationName(APP_NAME)
+        .build()
+  }
+
+  def createTopic(topic: String): Unit = {
+    val topicRequest = new Topic()
+    client.projects().topics().create(topic, topicRequest.setName(topic)).execute()
+  }
+
+  def createSubscription(topic: String, subscription: String): Unit = {
+    val subscriptionRequest = new Subscription()
+    client.projects().subscriptions().create(subscription,
+      subscriptionRequest.setTopic(topic).setName(subscription)).execute()
+  }
+
+  def publishData(topic: String, messages: List[SparkPubsubMessage]): Unit = {
+    val publishRequest = new PublishRequest()
+    publishRequest.setMessages(messages.map(m => m.message).asJava)
+    client.projects().topics().publish(topic, publishRequest).execute()
+  }
+
+  def removeSubscription(subscription: String): Unit = {
+    client.projects().subscriptions().delete(subscription).execute()
+  }
+
+  def removeTopic(topic: String): Unit = {
+    client.projects().topics().delete(topic).execute()
+  }
+
+  def generatorMessages(num: Int): List[SparkPubsubMessage] = {
+    (1 to num)
+        .map(n => {
+          val m = new PubsubMessage()
+          m.encodeData(s"data$n".getBytes)
+          m.setAttributes(Map("a1" -> s"v1$n", "a2" -> s"v2$n").asJava)
+        })
+        .map(m => {
+          val sm = new SparkPubsubMessage()
+          sm.message = m
+          sm
+        })
+        .toList
+  }
+
+  def getFullTopicPath(topic: String): String =
+    s"projects/${PubsubTestUtils.projectId}/topics/$topic"
+
+  def getFullSubscriptionPath(subscription: String): String =
+    s"projects/${PubsubTestUtils.projectId}/subscriptions/$subscription"
+
+}
+
+private[pubsub] object PubsubTestUtils {
+
+  val envVarNameForEnablingTests = "ENABLE_PUBSUB_TESTS"
+  val envVarNameForGoogleCloudProjectId = "GCP_TEST_PROJECT_ID"
+  val envVarNameForJsonKeyPath = "GCP_TEST_JSON_KEY_PATH"
+  val envVarNameForP12KeyPath = "GCP_TEST_P12_KEY_PATH"
+  val envVarNameForAccount = "GCP_TEST_ACCOUNT"
+
+  lazy val shouldRunTests = {
+    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+    if (isEnvSet) {
+      // scalastyle:off println
+      // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+      println(
+        s"""
+           |Google Pub/Sub tests that actually send data has been enabled by setting the environment
+           |variable $envVarNameForEnablingTests to 1.
+           |This will create Pub/Sub Topics and Subscriptions in Google cloud platform.
+           |Please be aware that this may incur some Google cloud costs.
+           |Set the environment variable $envVarNameForGoogleCloudProjectId to the desired project.
+        """.stripMargin)
+      // scalastyle:on println
+    }
+    isEnvSet
+  }
+
+  lazy val projectId = {
+    val id = sys.env.getOrElse(envVarNameForGoogleCloudProjectId,
+      throw new IllegalArgumentException(
+        s"Need to set environment varibable $envVarNameForGoogleCloudProjectId if enable test."))
+    // scalastyle:off println
+    // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+    println(s"Using project $id for creating Pub/Sub topic and subscription for tests.")
+    // scalastyle:on println
+    id
+  }
+
+  lazy val credential =
+    sys.env.get(envVarNameForJsonKeyPath)
+        .map(path => SparkGCPCredentials.builder.jsonServiceAccount(path).build())
+        .getOrElse(
+          sys.env.get(envVarNameForP12KeyPath)
+            .map(path => SparkGCPCredentials.builder.p12ServiceAccount(
+              path, sys.env.get(envVarNameForAccount).get
+            ).build())
+            .getOrElse(SparkGCPCredentials.builder.build()))
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
new file mode 100644
index 0000000..e47b0b2
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import java.io.FileNotFoundException
+
+import org.scalatest.concurrent.Timeouts
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkFunSuite
+
+class SparkGCPCredentialsBuilderSuite extends SparkFunSuite with Timeouts {
+  private def builder = SparkGCPCredentials.builder
+
+  private val jsonCreds = ServiceAccountCredentials(
+    jsonFilePath = Option("json-key-path")
+  )
+
+  private val p12Creds = ServiceAccountCredentials(
+    p12FilePath = Option("p12-key-path"),
+    emailAccount = Option("email")
+  )
+
+  private val metadataCreds = ServiceAccountCredentials()
+
+  test("should build application default") {
+    assert(builder.build() === ApplicationDefaultCredentials)
+  }
+
+  test("should build json service account") {
+    assertResult(jsonCreds) {
+      builder.jsonServiceAccount(jsonCreds.jsonFilePath.get).build()
+    }
+  }
+
+  test("should provide json creds") {
+    val thrown = intercept[FileNotFoundException] {
+      jsonCreds.provider
+    }
+    assert(thrown.getMessage === "json-key-path (No such file or directory)")
+  }
+
+  test("should build p12 service account") {
+    assertResult(p12Creds) {
+      builder.p12ServiceAccount(p12Creds.p12FilePath.get, p12Creds.emailAccount.get).build()
+    }
+  }
+
+  test("should provide p12 creds") {
+    val thrown = intercept[FileNotFoundException] {
+      p12Creds.provider
+    }
+    assert(thrown.getMessage === "p12-key-path (No such file or directory)")
+  }
+
+  test("should build metadata service account") {
+    assertResult(metadataCreds) {
+      builder.metadataServiceAccount().build()
+    }
+  }
+
+  test("SparkGCPCredentials classes should be serializable") {
+    assertResult(jsonCreds) {
+      Utils.deserialize[ServiceAccountCredentials](Utils.serialize(jsonCreds))
+    }
+
+    assertResult(p12Creds) {
+      Utils.deserialize[ServiceAccountCredentials](Utils.serialize(p12Creds))
+    }
+
+    assertResult(metadataCreds) {
+      Utils.deserialize[ServiceAccountCredentials](Utils.serialize(metadataCreds))
+    }
+
+    assertResult(ApplicationDefaultCredentials) {
+      Utils.deserialize[ServiceAccountCredentials](Utils.serialize(ApplicationDefaultCredentials))
+    }
+  }
+
+}