You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by ck...@apache.org on 2017/08/01 09:01:09 UTC

bahir git commit: [BAHIR-122][PubSub] Make "ServiceAccountCredentials" really broadcastable

Repository: bahir
Updated Branches:
  refs/heads/master a70ff538a -> e491610d8


[BAHIR-122][PubSub] Make "ServiceAccountCredentials" really broadcastable

Instead of requiring key files on each instance of the cluster, we read
the key file content on the driver node and store the binary in the
ServiceAccountCredentials. When the provider is called, it retrieves the
credential with the in-memory key file.

Closes #48


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e491610d
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e491610d
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e491610d

Branch: refs/heads/master
Commit: e491610d8a7a9cff1ed5087c1c0cbe6b2c29eb39
Parents: a70ff53
Author: Ire Sun <ir...@hotmail.com>
Authored: Tue Aug 1 02:00:58 2017 -0700
Committer: Christian Kadner <ck...@us.ibm.com>
Committed: Tue Aug 1 02:00:58 2017 -0700

----------------------------------------------------------------------
 streaming-pubsub/README.md                      | 25 +++++++
 streaming-pubsub/pom.xml                        | 10 +--
 .../streaming/pubsub/SparkGCPCredentials.scala  | 75 ++++++++++----------
 .../SparkGCPCredentialsBuilderSuite.scala       | 69 ++++++++++++------
 4 files changed, 115 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/e491610d/streaming-pubsub/README.md
----------------------------------------------------------------------
diff --git a/streaming-pubsub/README.md b/streaming-pubsub/README.md
index faf9826..db207ae 100644
--- a/streaming-pubsub/README.md
+++ b/streaming-pubsub/README.md
@@ -43,3 +43,28 @@ First you need to create credential by SparkGCPCredentials, it support four type
     JavaDStream<SparkPubsubMessage> lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...) 
 
 See end-to-end examples at [Google Cloud Pubsub Examples](streaming-pubsub/examples)
+
+### Unit Test
+
+To run the PubSub test cases, you need to generate **Google API service account key files** and set the corresponding environment variable to enable the test.
+
+#### To generate a service account key file with PubSub permission
+
+1. Go to [Google API Console](console.cloud.google.com)
+2. Choose the `Credentials` Tab> `Create credentials` button> `Service account key`
+3. Fill the account name, assign `Role> Pub/Sub> Pub/Sub Editor` and check the option `Furnish a private key` to create one. You need to create one for JSON key file, another for P12.
+4. The account email is the `Service account ID`
+
+#### Setting the environment variables and run test
+
+```
+mvn clean package -DskipTests -pl streaming-pubsub
+
+export ENABLE_PUBSUB_TESTS=1
+export GCP_TEST_ACCOUNT="THE_P12_SERVICE_ACCOUNT_ID_MENTIONED_ABOVE"
+export GCP_TEST_PROJECT_ID="YOUR_GCP_PROJECT_ID"
+export GCP_TEST_JSON_KEY_PATH=/path/to/pubsub/credential/files/Apache-Bahir-PubSub-1234abcd.json
+export GCP_TEST_P12_KEY_PATH=/path/to/pubsub/credential/files/Apache-Bahir-PubSub-5678efgh.p12
+
+mvn test -pl streaming-pubsub
+```

http://git-wip-us.apache.org/repos/asf/bahir/blob/e491610d/streaming-pubsub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml
index c3da90f..eb4fa25 100644
--- a/streaming-pubsub/pom.xml
+++ b/streaming-pubsub/pom.xml
@@ -56,11 +56,6 @@
       <version>1.6.0</version>
     </dependency>
     <dependency>
-      <groupId>com.google.cloud.bigdataoss</groupId>
-      <artifactId>util-hadoop</artifactId>
-      <version>1.6.0-hadoop2</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
@@ -72,6 +67,11 @@
       <artifactId>scalacheck_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client-jackson</artifactId>
+      <version>1.22.0</version>
+    </dependency>
   </dependencies>
   <build>
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/bahir/blob/e491610d/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
index 5cadde3..f15a521 100644
--- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
@@ -19,8 +19,11 @@ package org.apache.spark.streaming.pubsub
 
 import com.google.api.client.auth.oauth2.Credential
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
+import com.google.api.client.json.jackson.JacksonFactory
 import com.google.api.services.pubsub.PubsubScopes
-import com.google.cloud.hadoop.util.{EntriesCredentialConfiguration, HadoopCredentialConfiguration}
+import com.google.cloud.hadoop.util.{CredentialFactory, HttpTransportFactory}
+import java.io.{ByteArrayInputStream, File, FileNotFoundException, FileOutputStream}
+import java.nio.file.{Files, Paths}
 import java.util
 import org.apache.hadoop.conf.Configuration
 
@@ -58,46 +61,42 @@ private[pubsub] final case class ServiceAccountCredentials(
     p12FilePath: Option[String] = None,
     emailAccount: Option[String] = None)
     extends SparkGCPCredentials {
+  private val fileBytes = getFileBuffer
 
   override def provider: Credential = {
-    val conf = new Configuration(false)
-    conf.setBoolean(
-      EntriesCredentialConfiguration.BASE_KEY_PREFIX
-          + EntriesCredentialConfiguration.ENABLE_SERVICE_ACCOUNTS_SUFFIX,
-      true)
-    jsonFilePath match {
-      case Some(jsonFilePath) =>
-        conf.set(
-          EntriesCredentialConfiguration.BASE_KEY_PREFIX
-              + EntriesCredentialConfiguration.JSON_KEYFILE_SUFFIX,
-          jsonFilePath
-        )
-      case _ => // do nothing
-    }
-    p12FilePath match {
-      case Some(p12FilePath) =>
-        conf.set(
-          EntriesCredentialConfiguration.BASE_KEY_PREFIX
-              + EntriesCredentialConfiguration.SERVICE_ACCOUNT_KEYFILE_SUFFIX,
-          p12FilePath
-        )
-      case _ => // do nothing
-    }
-    emailAccount match {
-      case Some(emailAccount) =>
-        conf.set(
-          EntriesCredentialConfiguration.BASE_KEY_PREFIX
-              + EntriesCredentialConfiguration.SERVICE_ACCOUNT_EMAIL_SUFFIX,
-          emailAccount
-        )
-      case _ => // do nothing
-    }
+    val jsonFactory = new JacksonFactory
+    val scopes = new util.ArrayList(PubsubScopes.all())
+    val transport = HttpTransportFactory.createHttpTransport(
+      HttpTransportFactory.HttpTransportType.JAVA_NET, null)
+
+    if (!jsonFilePath.isEmpty) {
+      val stream = new ByteArrayInputStream(fileBytes)
+      CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential(
+        GoogleCredential.fromStream(stream, transport, jsonFactory)
+        .createScoped(scopes))
+    } else if (!p12FilePath.isEmpty && !emailAccount.isEmpty) {
+      val tempFile = File.createTempFile(emailAccount.get, ".p12")
+      tempFile.deleteOnExit
+      val p12Out = new FileOutputStream(tempFile)
+      p12Out.write(fileBytes, 0, fileBytes.length)
+      p12Out.close
+
+      new CredentialFactory.GoogleCredentialWithRetry(
+        new GoogleCredential.Builder().setTransport(transport)
+        .setJsonFactory(jsonFactory)
+        .setServiceAccountId(emailAccount.get)
+        .setServiceAccountScopes(scopes)
+        .setServiceAccountPrivateKeyFromP12File(tempFile)
+        .setRequestInitializer(new CredentialFactory.CredentialHttpRetryInitializer()))
+    } else (new CredentialFactory).getCredentialFromMetadataServiceAccount
+  }
 
-    HadoopCredentialConfiguration
-        .newBuilder()
-        .withConfiguration(conf)
-        .build()
-        .getCredential(new util.ArrayList(PubsubScopes.all()))
+  private def getFileBuffer: Array[Byte] = {
+    val filePath = jsonFilePath orElse p12FilePath
+    if (filePath.isEmpty) Array[Byte]()
+    else if (!Files.exists(Paths.get(filePath.get))) {
+      throw new FileNotFoundException(s"The key file path(${filePath.get}) doesn't exist.")
+    } else Files.readAllBytes(Paths.get(filePath.get))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/e491610d/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
index e47b0b2..bf68a07 100644
--- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
@@ -17,64 +17,91 @@
 
 package org.apache.spark.streaming.pubsub
 
-import java.io.FileNotFoundException
+import java.nio.file.{Files, Paths}
 
 import org.scalatest.concurrent.Timeouts
+import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.util.Utils
 import org.apache.spark.SparkFunSuite
 
-class SparkGCPCredentialsBuilderSuite extends SparkFunSuite with Timeouts {
+class SparkGCPCredentialsBuilderSuite
+    extends SparkFunSuite with Timeouts with BeforeAndAfter{
   private def builder = SparkGCPCredentials.builder
 
-  private val jsonCreds = ServiceAccountCredentials(
-    jsonFilePath = Option("json-key-path")
-  )
+  private val jsonFilePath = sys.env.get(PubsubTestUtils.envVarNameForJsonKeyPath)
+  private val p12FilePath = sys.env.get(PubsubTestUtils.envVarNameForP12KeyPath)
+  private val emailAccount = sys.env.get(PubsubTestUtils.envVarNameForAccount)
 
-  private val p12Creds = ServiceAccountCredentials(
-    p12FilePath = Option("p12-key-path"),
-    emailAccount = Option("email")
-  )
-
-  private val metadataCreds = ServiceAccountCredentials()
+  private def jsonAssumption {
+    assume(
+      !jsonFilePath.isEmpty,
+      s"as the environment variable ${PubsubTestUtils.envVarNameForJsonKeyPath} is not set.")
+  }
+  private def p12Assumption {
+    assume(
+      !p12FilePath.isEmpty,
+      s"as the environment variable ${PubsubTestUtils.envVarNameForP12KeyPath} is not set.")
+    assume(
+      !emailAccount.isEmpty,
+      s"as the environment variable ${PubsubTestUtils.envVarNameForAccount} is not set.")
+  }
 
   test("should build application default") {
     assert(builder.build() === ApplicationDefaultCredentials)
   }
 
   test("should build json service account") {
+    jsonAssumption
+
+    val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
     assertResult(jsonCreds) {
-      builder.jsonServiceAccount(jsonCreds.jsonFilePath.get).build()
+      builder.jsonServiceAccount(jsonFilePath.get).build()
     }
   }
 
   test("should provide json creds") {
-    val thrown = intercept[FileNotFoundException] {
-      jsonCreds.provider
-    }
-    assert(thrown.getMessage === "json-key-path (No such file or directory)")
+    jsonAssumption
+
+    val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
+    val credential = jsonCreds.provider
+    assert(credential.refreshToken, "Failed to retrive a new access token.")
   }
 
   test("should build p12 service account") {
+    p12Assumption
+
+    val p12Creds = ServiceAccountCredentials(
+      p12FilePath = p12FilePath, emailAccount = emailAccount)
     assertResult(p12Creds) {
-      builder.p12ServiceAccount(p12Creds.p12FilePath.get, p12Creds.emailAccount.get).build()
+      builder.p12ServiceAccount(p12FilePath.get, emailAccount.get).build()
     }
   }
 
   test("should provide p12 creds") {
-    val thrown = intercept[FileNotFoundException] {
-      p12Creds.provider
-    }
-    assert(thrown.getMessage === "p12-key-path (No such file or directory)")
+    p12Assumption
+
+    val p12Creds = ServiceAccountCredentials(
+      p12FilePath = p12FilePath, emailAccount = emailAccount)
+    val credential = p12Creds.provider
+    assert(credential.refreshToken, "Failed to retrive a new access token.")
   }
 
   test("should build metadata service account") {
+    val metadataCreds = ServiceAccountCredentials()
     assertResult(metadataCreds) {
       builder.metadataServiceAccount().build()
     }
   }
 
   test("SparkGCPCredentials classes should be serializable") {
+    jsonAssumption
+    p12Assumption
+
+    val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
+    val p12Creds = ServiceAccountCredentials(
+      p12FilePath = p12FilePath, emailAccount = emailAccount)
+    val metadataCreds = ServiceAccountCredentials()
     assertResult(jsonCreds) {
       Utils.deserialize[ServiceAccountCredentials](Utils.serialize(jsonCreds))
     }