You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:25 UTC
[39/50] [abbrv] incubator-livy-website git commit: [BAHIR-116] Add
spark streaming connector to Google Cloud Pub/Sub
[BAHIR-116] Add spark streaming connector to Google Cloud Pub/Sub
Cloases #42.
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/commit/56613263
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/tree/56613263
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/diff/56613263
Branch: refs/heads/master
Commit: 56613263ca405aa5b45f32565ad4641f0a7b9752
Parents: 2a43076
Author: Chen Bin <bc...@talend.com>
Authored: Thu Apr 27 17:18:32 2017 +0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Tue Jun 6 21:03:49 2017 -0700
----------------------------------------------------------------------
pom.xml | 1 +
streaming-pubsub/README.md | 45 +++
.../PubsubWordCount.scala | 159 +++++++++++
streaming-pubsub/pom.xml | 86 ++++++
.../streaming/pubsub/PubsubInputDStream.scala | 286 +++++++++++++++++++
.../spark/streaming/pubsub/PubsubUtils.scala | 105 +++++++
.../streaming/pubsub/SparkGCPCredentials.scala | 166 +++++++++++
.../streaming/LocalJavaStreamingContext.java | 44 +++
.../streaming/pubsub/JavaPubsubStreamSuite.java | 38 +++
.../src/test/resources/log4j.properties | 28 ++
.../spark/streaming/pubsub/PubsubFunSuite.scala | 46 +++
.../streaming/pubsub/PubsubStreamSuite.scala | 138 +++++++++
.../streaming/pubsub/PubsubTestUtils.scala | 142 +++++++++
.../SparkGCPCredentialsBuilderSuite.scala | 95 ++++++
14 files changed, 1379 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f76aac5..81f2e28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
<module>sql-streaming-mqtt</module>
<module>streaming-twitter</module>
<module>streaming-zeromq</module>
+ <module>streaming-pubsub</module>
</modules>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/README.md
----------------------------------------------------------------------
diff --git a/streaming-pubsub/README.md b/streaming-pubsub/README.md
new file mode 100644
index 0000000..faf9826
--- /dev/null
+++ b/streaming-pubsub/README.md
@@ -0,0 +1,45 @@
+A library for reading data from [Google Cloud Pub/Sub](https://cloud.google.com/pubsub/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+
+ libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.2.0-SNAPSHOT"
+
+Using Maven:
+
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-streaming-pubsub_2.11</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+ $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+First you need to create credential by SparkGCPCredentials, it support four type of credentials
+* application default
+ `SparkGCPCredentials.builder.build()`
+* json type service account
+ `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()`
+* p12 type service account
+ `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()`
+* metadata service account(running on dataproc)
+ `SparkGCPCredentials.builder.metadataServiceAccount().build()`
+
+### Scala API
+
+ val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, credential, ..)
+
+### Java API
+
+ JavaDStream<SparkPubsubMessage> lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...)
+
+See end-to-end examples at [Google Cloud Pubsub Examples](streaming-pubsub/examples)
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala b/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala
new file mode 100644
index 0000000..00f1fa1
--- /dev/null
+++ b/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming.pubsub
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
+import com.google.api.client.json.jackson2.JacksonFactory
+import com.google.api.services.pubsub.Pubsub.Builder
+import com.google.api.services.pubsub.model.PublishRequest
+import com.google.api.services.pubsub.model.PubsubMessage
+import com.google.cloud.hadoop.util.RetryHttpInitializer
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.pubsub.ConnectionUtils
+import org.apache.spark.streaming.pubsub.PubsubTestUtils
+import org.apache.spark.streaming.pubsub.PubsubUtils
+import org.apache.spark.streaming.pubsub.SparkGCPCredentials
+import org.apache.spark.streaming.pubsub.SparkPubsubMessage
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.SparkConf
+
+
+/**
+ * Consumes messages from a Google Cloud Pub/Sub subscription and does wordcount.
+ * In this example it use application default credentials, so need to use gcloud
+ * client to generate token file before running example
+ *
+ * Usage: PubsubWordCount <projectId> <subscription>
+ * <projectId> is the name of Google cloud
+ * <subscription> is the subscription to a topic
+ *
+ * Example:
+ * # use gcloud client generate token file
+ * $ gcloud init
+ * $ gcloud auth application-default login
+ *
+ * # run the example
+ * $ bin/run-example \
+ * org.apache.spark.examples.streaming.pubsub.PubsubWordCount project_1 subscription_1
+ *
+ */
+object PubsubWordCount {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 2) {
+ System.err.println(
+ """
+ |Usage: PubsubWordCount <projectId> <subscription>
+ |
+ | <projectId> is the name of Google cloud
+ | <subscription> is the subscription to a topic
+ |
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ val Seq(projectId, subscription) = args.toSeq
+
+ val sparkConf = new SparkConf().setAppName("PubsubWordCount")
+ val ssc = new StreamingContext(sparkConf, Milliseconds(2000))
+
+ val pubsubStream: ReceiverInputDStream[SparkPubsubMessage] = PubsubUtils.createStream(
+ ssc, projectId, None, subscription,
+ SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ val wordCounts =
+ pubsubStream.map(message => (new String(message.getData()), 1)).reduceByKey(_ + _)
+
+ wordCounts.print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+
+}
+
+/**
+ * A Pub/Sub publisher for demonstration purposes, publishes message in 10 batches(seconds),
+ * you can set the size of messages in each batch by <records-per-sec>,
+ * and each message will contains only one word in this list
+ * ("google", "cloud", "pubsub", "say", "hello")
+ *
+ * Usage: PubsubPublisher <projectId> <topic> <records-per-sec>
+ *
+ * <stream-projectIdname> is the name of Google cloud
+ * <topic> is the topic of Google cloud Pub/Sub
+ * <records-per-sec> is the rate of records per second to put onto the stream
+ *
+ * Example:
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.pubsub.PubsubPublisher project_1 topic_1 10`
+ */
+object PubsubPublisher {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 3) {
+ System.err.println(
+ """
+ |Usage: PubsubPublisher <projectId> <topic> <records-per-sec>
+ |
+ | <projectId> is the name of Google cloud
+ | <topic> is the topic of Google cloud Pub/Sub
+ | <records-per-sec> is the rate of records per second to put onto the topic
+ |
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ val Seq(projectId, topic, recordsPerSecond) = args.toSeq
+
+ val APP_NAME = this.getClass.getSimpleName
+
+ val client = new Builder(
+ GoogleNetHttpTransport.newTrustedTransport(),
+ JacksonFactory.getDefaultInstance(),
+ new RetryHttpInitializer(
+ SparkGCPCredentials.builder.build().provider,
+ APP_NAME
+ ))
+ .setApplicationName(APP_NAME)
+ .build()
+
+ val randomWords = List("google", "cloud", "pubsub", "say", "hello")
+ val publishRequest = new PublishRequest()
+ for (i <- 1 to 10) {
+ val messages = (1 to recordsPerSecond.toInt).map { recordNum =>
+ val randomWordIndex = Random.nextInt(randomWords.size)
+ new PubsubMessage().encodeData(randomWords(randomWordIndex).getBytes())
+ }
+ publishRequest.setMessages(messages.asJava)
+ client.projects().topics()
+ .publish(s"projects/$projectId/topics/$topic", publishRequest)
+ .execute()
+ println(s"Published data. topic: $topic; Mesaage: $publishRequest")
+
+ Thread.sleep(1000)
+ }
+
+ }
+}
+// scalastyle:on
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml
new file mode 100644
index 0000000..c3da90f
--- /dev/null
+++ b/streaming-pubsub/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>bahir-parent_2.11</artifactId>
+ <groupId>org.apache.bahir</groupId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-streaming-pubsub_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-pubsub</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Apache Bahir - Spark Streaming Google PubSub</name>
+ <url>http://bahir.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-pubsub</artifactId>
+ <version>v1-rev355-1.22.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>util</artifactId>
+ <version>1.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>util-hadoop</artifactId>
+ <version>1.6.0-hadoop2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
new file mode 100644
index 0000000..e769f2e
--- /dev/null
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.google.api.client.auth.oauth2.Credential
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
+import com.google.api.client.googleapis.json.GoogleJsonResponseException
+import com.google.api.client.json.jackson2.JacksonFactory
+import com.google.api.services.pubsub.Pubsub.Builder
+import com.google.api.services.pubsub.model.{AcknowledgeRequest, PubsubMessage, PullRequest}
+import com.google.api.services.pubsub.model.Subscription
+import com.google.cloud.hadoop.util.RetryHttpInitializer
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
+
+/**
+ * Input stream that subscribe messages from Google cloud Pub/Sub subscription.
+ * @param project Google cloud project id
+ * @param topic Topic name for creating subscription if need
+ * @param subscription Pub/Sub subscription name
+ * @param credential Google cloud project credential to access Pub/Sub service
+ */
+private[streaming]
+class PubsubInputDStream(
+ _ssc: StreamingContext,
+ val project: String,
+ val topic: Option[String],
+ val subscription: String,
+ val credential: SparkGCPCredentials,
+ val _storageLevel: StorageLevel
+) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) {
+
+ override def getReceiver(): Receiver[SparkPubsubMessage] = {
+ new PubsubReceiver(project, topic, subscription, credential, _storageLevel)
+ }
+}
+
+/**
+ * A wrapper class for PubsubMessage's with a custom serialization format.
+ *
+ * This is necessary because PubsubMessage uses inner data structures
+ * which are not serializable.
+ */
+class SparkPubsubMessage() extends Externalizable {
+
+ private[pubsub] var message = new PubsubMessage
+
+ def getData(): Array[Byte] = message.decodeData()
+
+ def getAttributes(): java.util.Map[String, String] = message.getAttributes
+
+ def getMessageId(): String = message.getMessageId
+
+ def getPublishTime(): String = message.getPublishTime
+
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+ message.decodeData() match {
+ case null => out.writeInt(-1)
+ case data =>
+ out.writeInt(data.size)
+ out.write(data)
+ }
+
+ message.getMessageId match {
+ case null => out.writeInt(-1)
+ case id =>
+ val idBuff = Utils.serialize(id)
+ out.writeInt(idBuff.length)
+ out.write(idBuff)
+ }
+
+ message.getPublishTime match {
+ case null => out.writeInt(-1)
+ case time =>
+ val publishTimeBuff = Utils.serialize(time)
+ out.writeInt(publishTimeBuff.length)
+ out.write(publishTimeBuff)
+ }
+
+ message.getAttributes match {
+ case null => out.writeInt(-1)
+ case attrs =>
+ out.writeInt(attrs.size())
+ for ((k, v) <- message.getAttributes.asScala) {
+ val keyBuff = Utils.serialize(k)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+ }
+
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
+ in.readInt() match {
+ case -1 => message.encodeData(null)
+ case bodyLength =>
+ val data = new Array[Byte](bodyLength)
+ in.readFully(data)
+ message.encodeData(data)
+ }
+
+ in.readInt() match {
+ case -1 => message.setMessageId(null)
+ case idLength =>
+ val idBuff = new Array[Byte](idLength)
+ in.readFully(idBuff)
+ val id: String = Utils.deserialize(idBuff)
+ message.setMessageId(id)
+ }
+
+ in.readInt() match {
+ case -1 => message.setPublishTime(null)
+ case publishTimeLength =>
+ val publishTimeBuff = new Array[Byte](publishTimeLength)
+ in.readFully(publishTimeBuff)
+ val publishTime: String = Utils.deserialize(publishTimeBuff)
+ message.setPublishTime(publishTime)
+ }
+
+ in.readInt() match {
+ case -1 => message.setAttributes(null)
+ case numAttributes =>
+ val attributes = new java.util.HashMap[String, String]
+ for (i <- 0 until numAttributes) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ attributes.put(key, value)
+ }
+ message.setAttributes(attributes)
+ }
+ }
+}
+
+private [pubsub]
+object ConnectionUtils {
+ val transport = GoogleNetHttpTransport.newTrustedTransport();
+ val jacksonFactory = JacksonFactory.getDefaultInstance;
+
+ // The topic or subscription already exists.
+ // This is an error on creation operations.
+ val ALREADY_EXISTS = 409
+
+ /**
+ * Client can retry with these response status
+ */
+ val RESOURCE_EXHAUSTED = 429
+
+ val CANCELLED = 499
+
+ val INTERNAL = 500
+
+ val UNAVAILABLE = 503
+
+ val DEADLINE_EXCEEDED = 504
+
+ def retryable(status: Int): Boolean = {
+ status match {
+ case RESOURCE_EXHAUSTED | CANCELLED | INTERNAL | UNAVAILABLE | DEADLINE_EXCEEDED => true
+ case _ => false
+ }
+ }
+}
+
+
+private[pubsub]
+class PubsubReceiver(
+ project: String,
+ topic: Option[String],
+ subscription: String,
+ credential: SparkGCPCredentials,
+ storageLevel: StorageLevel)
+ extends Receiver[SparkPubsubMessage](storageLevel) {
+
+ val APP_NAME = "sparkstreaming-pubsub-receiver"
+
+ val INIT_BACKOFF = 100 // 100ms
+
+ val MAX_BACKOFF = 10 * 1000 // 10s
+
+ val MAX_MESSAGE = 1000
+
+ lazy val client = new Builder(
+ ConnectionUtils.transport,
+ ConnectionUtils.jacksonFactory,
+ new RetryHttpInitializer(credential.provider, APP_NAME))
+ .setApplicationName(APP_NAME)
+ .build()
+
+ val projectFullName: String = s"projects/$project"
+ val subscriptionFullName: String = s"$projectFullName/subscriptions/$subscription"
+
+ override def onStart(): Unit = {
+ topic match {
+ case Some(t) =>
+ val sub: Subscription = new Subscription
+ sub.setTopic(s"$projectFullName/topics/$t")
+ try {
+ client.projects().subscriptions().create(subscriptionFullName, sub).execute()
+ } catch {
+ case e: GoogleJsonResponseException =>
+ if (e.getDetails.getCode == ConnectionUtils.ALREADY_EXISTS) {
+ // Ignore subscription already exists exception.
+ } else {
+ reportError("Failed to create subscription", e)
+ }
+ case NonFatal(e) =>
+ reportError("Failed to create subscription", e)
+ }
+ case None => // do nothing
+ }
+ new Thread() {
+ override def run() {
+ receive()
+ }
+ }.start()
+ }
+
+ def receive(): Unit = {
+ val pullRequest = new PullRequest().setMaxMessages(MAX_MESSAGE).setReturnImmediately(false)
+ var backoff = INIT_BACKOFF
+ while (!isStopped()) {
+ try {
+ val pullResponse =
+ client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute()
+ val receivedMessages = pullResponse.getReceivedMessages.asScala.toList
+ store(receivedMessages
+ .map(x => {
+ val sm = new SparkPubsubMessage
+ sm.message = x.getMessage
+ sm
+ })
+ .iterator)
+
+ val ackRequest = new AcknowledgeRequest()
+ ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
+ client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute()
+ backoff = INIT_BACKOFF
+ } catch {
+ case e: GoogleJsonResponseException =>
+ if (ConnectionUtils.retryable(e.getDetails.getCode)) {
+ Thread.sleep(backoff)
+ backoff = Math.min(backoff * 2, MAX_BACKOFF)
+ } else {
+ reportError("Failed to pull messages", e)
+ }
+ case NonFatal(e) => reportError("Failed to pull messages", e)
+ }
+ }
+ }
+
+ override def onStop(): Unit = {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
new file mode 100644
index 0000000..b4f02b9
--- /dev/null
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object PubsubUtils {
+
+ /**
+ * Create an input stream that receives messages pushed by a Pub/Sub publisher
+ * using service account authentication
+ *
+ * If topic is given, and the subscription doesn't exist,
+ * create subscription by the given name.
+ * Note: This Receiver will only receive the message arrived after the subscription created.
+ * If topic is not given, throw not found exception when it doesn't exist
+ *
+ * @param ssc StreamingContext object
+ * @param project Google cloud project id
+ * @param topic Topic name for creating subscription if need
+ * @param subscription Subscription name to subscribe to
+ * @param credentials SparkGCPCredentials to use for authenticating
+ * @param storageLevel RDD storage level
+ * @return
+ */
+ def createStream(
+ ssc: StreamingContext,
+ project: String,
+ topic: Option[String],
+ subscription: String,
+ credentials: SparkGCPCredentials,
+ storageLevel: StorageLevel): ReceiverInputDStream[SparkPubsubMessage] = {
+ ssc.withNamedScope("pubsub stream") {
+
+ new PubsubInputDStream(
+ ssc,
+ project,
+ topic,
+ subscription,
+ credentials,
+ storageLevel)
+ }
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a Pub/Sub publisher
+ * using given credential
+ *
+ * Throw not found exception if the subscription doesn't exist
+ *
+ * @param jssc JavaStreamingContext object
+ * @param project Google cloud project id
+ * @param subscription Subscription name to subscribe to
+ * @param credentials SparkGCPCredentials to use for authenticating
+ * @param storageLevel RDD storage level
+ * @return
+ */
+ def createStream(jssc: JavaStreamingContext, project: String, subscription: String,
+ credentials: SparkGCPCredentials, storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkPubsubMessage] = {
+ createStream(jssc.ssc, project, None, subscription, credentials, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a Pub/Sub publisher
+ * using given credential
+ *
+ * If the subscription doesn't exist, create subscription by the given name.
+ * Note: This Receiver will only receive the message arrived after the subscription created.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param project Google cloud project id
+ * @param topic Topic name for creating subscription if need
+ * @param subscription Subscription name to subscribe to
+ * @param credentials SparkGCPCredentials to use for authenticating
+ * @param storageLevel RDD storage level
+ * @return
+ */
+ def createStream(jssc: JavaStreamingContext,
+ project: String, topic: String, subscription: String,
+ credentials: SparkGCPCredentials, storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkPubsubMessage] = {
+ createStream(jssc.ssc, project, Some(topic), subscription, credentials, storageLevel)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
new file mode 100644
index 0000000..5cadde3
--- /dev/null
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import com.google.api.client.auth.oauth2.Credential
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
+import com.google.api.services.pubsub.PubsubScopes
+import com.google.cloud.hadoop.util.{EntriesCredentialConfiguration, HadoopCredentialConfiguration}
+import java.util
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * Serializable interface providing a method executors can call to obtain an
+ * GCPCredentialsProvider instance for authenticating to GCP services.
+ */
+private[pubsub] sealed trait SparkGCPCredentials extends Serializable {
+
+ def provider: Credential
+}
+
+/**
+ * Returns application default type credential
+ */
+private[pubsub] final case object ApplicationDefaultCredentials extends SparkGCPCredentials {
+
+ override def provider: Credential = {
+ GoogleCredential.getApplicationDefault.createScoped(PubsubScopes.all())
+ }
+}
+
+/**
+ * Returns a Service Account type Credential instance.
+ * If all parameters are None, then try metadata service type
+ * If jsonFilePath available, try json type
+ * If jsonFilePath is None and p12FilePath and emailAccount available, try p12 type
+ *
+ * @param jsonFilePath file path for json
+ * @param p12FilePath file path for p12
+ * @param emailAccount email account for p12
+ */
+private[pubsub] final case class ServiceAccountCredentials(
+ jsonFilePath: Option[String] = None,
+ p12FilePath: Option[String] = None,
+ emailAccount: Option[String] = None)
+ extends SparkGCPCredentials {
+
+ override def provider: Credential = {
+ val conf = new Configuration(false)
+ conf.setBoolean(
+ EntriesCredentialConfiguration.BASE_KEY_PREFIX
+ + EntriesCredentialConfiguration.ENABLE_SERVICE_ACCOUNTS_SUFFIX,
+ true)
+ jsonFilePath match {
+ case Some(jsonFilePath) =>
+ conf.set(
+ EntriesCredentialConfiguration.BASE_KEY_PREFIX
+ + EntriesCredentialConfiguration.JSON_KEYFILE_SUFFIX,
+ jsonFilePath
+ )
+ case _ => // do nothing
+ }
+ p12FilePath match {
+ case Some(p12FilePath) =>
+ conf.set(
+ EntriesCredentialConfiguration.BASE_KEY_PREFIX
+ + EntriesCredentialConfiguration.SERVICE_ACCOUNT_KEYFILE_SUFFIX,
+ p12FilePath
+ )
+ case _ => // do nothing
+ }
+ emailAccount match {
+ case Some(emailAccount) =>
+ conf.set(
+ EntriesCredentialConfiguration.BASE_KEY_PREFIX
+ + EntriesCredentialConfiguration.SERVICE_ACCOUNT_EMAIL_SUFFIX,
+ emailAccount
+ )
+ case _ => // do nothing
+ }
+
+ HadoopCredentialConfiguration
+ .newBuilder()
+ .withConfiguration(conf)
+ .build()
+ .getCredential(new util.ArrayList(PubsubScopes.all()))
+ }
+
+}
+
+object SparkGCPCredentials {
+
+ /**
+ * Builder for SparkGCPCredentials instance.
+ */
+ class Builder {
+ private var creds: Option[SparkGCPCredentials] = None
+
+ /**
+ * Use a json type key file for service account credential
+ *
+ * @param jsonFilePath json type key file
+ * @return Reference to this SparkGCPCredentials.Builder
+ */
+ def jsonServiceAccount(jsonFilePath: String): Builder = {
+ creds = Option(ServiceAccountCredentials(Option(jsonFilePath)))
+ this
+ }
+
+ /**
+ * Use a p12 type key file service account credential
+ *
+ * @param p12FilePath p12 type key file
+ * @param emailAccount email of service account
+ * @return Reference to this SparkGCPCredentials.Builder
+ */
+ def p12ServiceAccount(p12FilePath: String, emailAccount: String): Builder = {
+ creds = Option(ServiceAccountCredentials(
+ p12FilePath = Option(p12FilePath), emailAccount = Option(emailAccount)))
+ this
+ }
+
+ /**
+ * Use a meta data service to return service account
+ * @return Reference to this SparkGCPCredentials.Builder
+ */
+ def metadataServiceAccount(): Builder = {
+ creds = Option(ServiceAccountCredentials())
+ this
+ }
+
+ /**
+ * Returns the appropriate instance of SparkGCPCredentials given the configured
+ * parameters.
+ *
+ * - The service account credentials will be returned if they were provided.
+ *
+ * - The application default credentials will be returned otherwise.
+ * @return
+ */
+ def build(): SparkGCPCredentials = creds.getOrElse(ApplicationDefaultCredentials)
+
+ }
+
+ /**
+ * Creates a SparkGCPCredentials.Builder for constructing
+ * SparkGCPCredentials instance.
+ *
+ * @return SparkGCPCredentials.Builder instance
+ */
+ def builder: Builder = new Builder
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..cfedb5a
--- /dev/null
+++ b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java
new file mode 100644
index 0000000..360b9a9
--- /dev/null
+++ b/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaPubsubStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testPubsubStream() {
+ // tests the API, does not actually test data receiving
+ JavaReceiverInputDStream<SparkPubsubMessage> stream1 = PubsubUtils.createStream(
+ ssc, "project", "subscription",
+ new SparkGCPCredentials.Builder().build(), StorageLevel.MEMORY_AND_DISK_SER_2());
+
+ JavaReceiverInputDStream<SparkPubsubMessage> stream2 = PubsubUtils.createStream(
+ ssc, "project", "topic", "subscription",
+ new SparkGCPCredentials.Builder().build(), StorageLevel.MEMORY_AND_DISK_SER_2());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/resources/log4j.properties b/streaming-pubsub/src/test/resources/log4j.properties
new file mode 100644
index 0000000..75e3b53
--- /dev/null
+++ b/streaming-pubsub/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
new file mode 100644
index 0000000..acdceb7
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Helper class that runs Google Cloud Pub/Sub real data transfer tests of
+ * ignores them based on env variable is set or not.
+ */
+trait PubsubFunSuite extends SparkFunSuite {
+ import PubsubTestUtils._
+
+ /** Run the test if environment variable is set or ignore the test */
+ def testIfEnabled(testName: String)(testBody: => Unit) {
+ if (shouldRunTests) {
+ test(testName)(testBody)
+ } else {
+ ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
+ }
+ }
+
+ /** Run the give body of code only if Kinesis tests are enabled */
+ def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
+ if (shouldRunTests) {
+ body
+ } else {
+ ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
new file mode 100644
index 0000000..284950c
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import java.util.UUID
+
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Seconds
+
+class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAfter {
+
+ val batchDuration = Seconds(1)
+
+ private val master: String = "local[2]"
+
+ private val appName: String = this.getClass.getSimpleName
+
+ private val topicName: String = s"bahirStreamTestTopic_${UUID.randomUUID()}"
+
+ private val subscriptionName: String = s"${topicName}_sub"
+
+ private val subForCreateName: String = s"${topicName}_create_me"
+
+ private var ssc: StreamingContext = null
+ private var pubsubTestUtils: PubsubTestUtils = null
+ private var topicFullName: String = null
+ private var subscriptionFullName: String = null
+ private var subForCreateFullName: String = null
+
+ override def beforeAll(): Unit = {
+ runIfTestsEnabled("Prepare PubsubTestUtils") {
+ pubsubTestUtils = new PubsubTestUtils
+ topicFullName = pubsubTestUtils.getFullTopicPath(topicName)
+ subscriptionFullName = pubsubTestUtils.getFullSubscriptionPath(subscriptionName)
+ subForCreateFullName = pubsubTestUtils.getFullSubscriptionPath(subForCreateName)
+ pubsubTestUtils.createTopic(topicFullName)
+ pubsubTestUtils.createSubscription(topicFullName, subscriptionFullName)
+ }
+ }
+
+ override def afterAll(): Unit = {
+ if (pubsubTestUtils != null) {
+ pubsubTestUtils.removeSubscription(subForCreateFullName)
+ pubsubTestUtils.removeSubscription(subscriptionFullName)
+ pubsubTestUtils.removeTopic(topicFullName)
+ }
+ }
+
+ before {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ }
+
+ test("PubsubUtils API") {
+ val pubsubStream1 = PubsubUtils.createStream(
+ ssc, "project", None, "subscription",
+ PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ val pubsubStream2 = PubsubUtils.createStream(
+ ssc, "project", Some("topic"), "subscription",
+ PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+ }
+
+ testIfEnabled("pubsub input stream") {
+ val receiveStream = PubsubUtils.createStream(
+ ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
+ PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ @volatile var receiveMessages: List[SparkPubsubMessage] = List()
+ receiveStream.foreachRDD { rdd =>
+ if (rdd.collect().length > 0) {
+ receiveMessages = receiveMessages ::: List(rdd.first)
+ receiveMessages
+ }
+ }
+
+ ssc.start()
+
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ val sendMessages = pubsubTestUtils.generatorMessages(10)
+ pubsubTestUtils.publishData(topicFullName, sendMessages)
+ assert(sendMessages.map(m => new String(m.getData))
+ .contains(new String(receiveMessages(0).getData)))
+ assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
+ }
+ }
+
+ testIfEnabled("pubsub input stream, create pubsub") {
+ val receiveStream = PubsubUtils.createStream(
+ ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName,
+ PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ @volatile var receiveMessages: List[SparkPubsubMessage] = List()
+ receiveStream.foreachRDD { rdd =>
+ if (rdd.collect().length > 0) {
+ receiveMessages = receiveMessages ::: List(rdd.first)
+ receiveMessages
+ }
+ }
+
+ ssc.start()
+
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ val sendMessages = pubsubTestUtils.generatorMessages(10)
+ pubsubTestUtils.publishData(topicFullName, sendMessages)
+ assert(sendMessages.map(m => new String(m.getData))
+ .contains(new String(receiveMessages(0).getData)))
+ assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
new file mode 100644
index 0000000..9dd719a
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import scala.collection.JavaConverters._
+
+import com.google.api.services.pubsub.Pubsub
+import com.google.api.services.pubsub.Pubsub.Builder
+import com.google.api.services.pubsub.model.PublishRequest
+import com.google.api.services.pubsub.model.PubsubMessage
+import com.google.api.services.pubsub.model.Subscription
+import com.google.api.services.pubsub.model.Topic
+import com.google.cloud.hadoop.util.RetryHttpInitializer
+
+import org.apache.spark.internal.Logging
+
+private[pubsub] class PubsubTestUtils extends Logging {
+
+ val APP_NAME = this.getClass.getSimpleName
+
+ val client: Pubsub = {
+ new Builder(
+ ConnectionUtils.transport,
+ ConnectionUtils.jacksonFactory,
+ new RetryHttpInitializer(
+ PubsubTestUtils.credential.provider,
+ APP_NAME
+ ))
+ .setApplicationName(APP_NAME)
+ .build()
+ }
+
+ def createTopic(topic: String): Unit = {
+ val topicRequest = new Topic()
+ client.projects().topics().create(topic, topicRequest.setName(topic)).execute()
+ }
+
+ def createSubscription(topic: String, subscription: String): Unit = {
+ val subscriptionRequest = new Subscription()
+ client.projects().subscriptions().create(subscription,
+ subscriptionRequest.setTopic(topic).setName(subscription)).execute()
+ }
+
+ def publishData(topic: String, messages: List[SparkPubsubMessage]): Unit = {
+ val publishRequest = new PublishRequest()
+ publishRequest.setMessages(messages.map(m => m.message).asJava)
+ client.projects().topics().publish(topic, publishRequest).execute()
+ }
+
+ def removeSubscription(subscription: String): Unit = {
+ client.projects().subscriptions().delete(subscription).execute()
+ }
+
+ def removeTopic(topic: String): Unit = {
+ client.projects().topics().delete(topic).execute()
+ }
+
+ def generatorMessages(num: Int): List[SparkPubsubMessage] = {
+ (1 to num)
+ .map(n => {
+ val m = new PubsubMessage()
+ m.encodeData(s"data$n".getBytes)
+ m.setAttributes(Map("a1" -> s"v1$n", "a2" -> s"v2$n").asJava)
+ })
+ .map(m => {
+ val sm = new SparkPubsubMessage()
+ sm.message = m
+ sm
+ })
+ .toList
+ }
+
+ def getFullTopicPath(topic: String): String =
+ s"projects/${PubsubTestUtils.projectId}/topics/$topic"
+
+ def getFullSubscriptionPath(subscription: String): String =
+ s"projects/${PubsubTestUtils.projectId}/subscriptions/$subscription"
+
+}
+
+private[pubsub] object PubsubTestUtils {
+
+ val envVarNameForEnablingTests = "ENABLE_PUBSUB_TESTS"
+ val envVarNameForGoogleCloudProjectId = "GCP_TEST_PROJECT_ID"
+ val envVarNameForJsonKeyPath = "GCP_TEST_JSON_KEY_PATH"
+ val envVarNameForP12KeyPath = "GCP_TEST_P12_KEY_PATH"
+ val envVarNameForAccount = "GCP_TEST_ACCOUNT"
+
+ lazy val shouldRunTests = {
+ val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+ if (isEnvSet) {
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(
+ s"""
+ |Google Pub/Sub tests that actually send data has been enabled by setting the environment
+ |variable $envVarNameForEnablingTests to 1.
+ |This will create Pub/Sub Topics and Subscriptions in Google cloud platform.
+ |Please be aware that this may incur some Google cloud costs.
+ |Set the environment variable $envVarNameForGoogleCloudProjectId to the desired project.
+ """.stripMargin)
+ // scalastyle:on println
+ }
+ isEnvSet
+ }
+
+ lazy val projectId = {
+ val id = sys.env.getOrElse(envVarNameForGoogleCloudProjectId,
+ throw new IllegalArgumentException(
+ s"Need to set environment varibable $envVarNameForGoogleCloudProjectId if enable test."))
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(s"Using project $id for creating Pub/Sub topic and subscription for tests.")
+ // scalastyle:on println
+ id
+ }
+
+ lazy val credential =
+ sys.env.get(envVarNameForJsonKeyPath)
+ .map(path => SparkGCPCredentials.builder.jsonServiceAccount(path).build())
+ .getOrElse(
+ sys.env.get(envVarNameForP12KeyPath)
+ .map(path => SparkGCPCredentials.builder.p12ServiceAccount(
+ path, sys.env.get(envVarNameForAccount).get
+ ).build())
+ .getOrElse(SparkGCPCredentials.builder.build()))
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
new file mode 100644
index 0000000..e47b0b2
--- /dev/null
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.pubsub
+
+import java.io.FileNotFoundException
+
+import org.scalatest.concurrent.Timeouts
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkFunSuite
+
+class SparkGCPCredentialsBuilderSuite extends SparkFunSuite with Timeouts {
+ private def builder = SparkGCPCredentials.builder
+
+ private val jsonCreds = ServiceAccountCredentials(
+ jsonFilePath = Option("json-key-path")
+ )
+
+ private val p12Creds = ServiceAccountCredentials(
+ p12FilePath = Option("p12-key-path"),
+ emailAccount = Option("email")
+ )
+
+ private val metadataCreds = ServiceAccountCredentials()
+
+ test("should build application default") {
+ assert(builder.build() === ApplicationDefaultCredentials)
+ }
+
+ test("should build json service account") {
+ assertResult(jsonCreds) {
+ builder.jsonServiceAccount(jsonCreds.jsonFilePath.get).build()
+ }
+ }
+
+ test("should provide json creds") {
+ val thrown = intercept[FileNotFoundException] {
+ jsonCreds.provider
+ }
+ assert(thrown.getMessage === "json-key-path (No such file or directory)")
+ }
+
+ test("should build p12 service account") {
+ assertResult(p12Creds) {
+ builder.p12ServiceAccount(p12Creds.p12FilePath.get, p12Creds.emailAccount.get).build()
+ }
+ }
+
+ test("should provide p12 creds") {
+ val thrown = intercept[FileNotFoundException] {
+ p12Creds.provider
+ }
+ assert(thrown.getMessage === "p12-key-path (No such file or directory)")
+ }
+
+ test("should build metadata service account") {
+ assertResult(metadataCreds) {
+ builder.metadataServiceAccount().build()
+ }
+ }
+
+ test("SparkGCPCredentials classes should be serializable") {
+ assertResult(jsonCreds) {
+ Utils.deserialize[ServiceAccountCredentials](Utils.serialize(jsonCreds))
+ }
+
+ assertResult(p12Creds) {
+ Utils.deserialize[ServiceAccountCredentials](Utils.serialize(p12Creds))
+ }
+
+ assertResult(metadataCreds) {
+ Utils.deserialize[ServiceAccountCredentials](Utils.serialize(metadataCreds))
+ }
+
+ assertResult(ApplicationDefaultCredentials) {
+ Utils.deserialize[ServiceAccountCredentials](Utils.serialize(ApplicationDefaultCredentials))
+ }
+ }
+
+}