You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/25 22:20:05 UTC

[2/3] beam git commit: [BEAM-1871] Move Bigquery/Pubsub options to sdks/java/io/google-cloud-platform

[BEAM-1871] Move Bigquery/Pubsub options to sdks/java/io/google-cloud-platform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f9f7bcd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f9f7bcd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f9f7bcd

Branch: refs/heads/master
Commit: 6f9f7bcd914d18eee4e00bd16d69c4d53636e244
Parents: 652a919
Author: Luke Cwik <lc...@google.com>
Authored: Mon Apr 24 15:58:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Apr 25 15:19:44 2017 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  22 ++
 .../beam/examples/common/ExampleUtils.java      |  60 ++++-
 .../examples/cookbook/BigQueryTornadoesIT.java  |   4 +-
 pom.xml                                         |   7 +
 .../options/DataflowPipelineOptions.java        |   4 +-
 sdks/java/extensions/gcp-core/pom.xml           |  29 +--
 .../beam/sdk/options/BigQueryOptions.java       |  32 ---
 .../options/GcpPipelineOptionsRegistrar.java    |   2 -
 .../apache/beam/sdk/options/PubsubOptions.java  |  36 ---
 .../beam/sdk/testing/BigqueryMatcher.java       | 256 ------------------
 .../apache/beam/sdk/testing/package-info.java   |  21 --
 .../org/apache/beam/sdk/util/Transport.java     |  36 ---
 .../org/apache/beam/GcpCoreApiSurfaceTest.java  |   2 -
 .../sdk/options/GoogleApiDebugOptionsTest.java  |   7 +-
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 176 -------------
 sdks/java/extensions/protobuf/pom.xml           |  19 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |   1 -
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   1 -
 .../sdk/io/gcp/bigquery/BigQueryOptions.java    |  39 +++
 .../io/gcp/bigquery/BigQueryQuerySource.java    |   1 -
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |   1 -
 .../io/gcp/bigquery/BigQueryServicesImpl.java   |  41 ++-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |   1 -
 .../io/gcp/bigquery/BigQueryTableSource.java    |   1 -
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  |   1 -
 .../beam/sdk/io/gcp/bigquery/PrepareWrite.java  |   1 -
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   1 -
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   |   1 -
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   1 -
 .../common/GcpIoPipelineOptionsRegistrar.java   |  39 +++
 .../beam/sdk/io/gcp/common/package-info.java    |  20 ++
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java    |   1 -
 .../sdk/io/gcp/pubsub/PubsubGrpcClient.java     |   1 -
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java |   1 -
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     |   1 -
 .../beam/sdk/io/gcp/pubsub/PubsubOptions.java   |  44 ++++
 .../sdk/io/gcp/pubsub/PubsubTestClient.java     |   1 -
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |   1 -
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |   1 -
 .../beam/sdk/io/gcp/testing/package-info.java   |  21 ++
 .../beam/sdk/io/gcp/GcpApiSurfaceTest.java      |   2 +
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   1 -
 .../io/gcp/bigquery/FakeBigQueryServices.java   |   1 -
 .../sdk/io/gcp/testing/BigqueryMatcher.java     | 257 +++++++++++++++++++
 .../sdk/io/gcp/testing/BigqueryMatcherTest.java | 177 +++++++++++++
 45 files changed, 737 insertions(+), 636 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index ae3d63d..9317136 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -485,6 +485,21 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auth</groupId>
+      <artifactId>google-auth-library-oauth2-http</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auth</groupId>
+      <artifactId>google-auth-library-credentials</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
     </dependency>
@@ -552,6 +567,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 6962571..2650f8e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
@@ -33,6 +34,10 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.pubsub.model.Subscription;
 import com.google.api.services.pubsub.model.Topic;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -42,10 +47,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.NullCredentialInitializer;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
 import org.joda.time.Duration;
 
@@ -196,10 +203,49 @@ public class ExampleUtils {
     }
   }
 
+  /**
+   * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
+   */
+  private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
+    return new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
+   * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
+   */
+  private static Pubsub.Builder newPubsubClient(PubsubOptions options) {
+    return new Pubsub.Builder(Transport.getTransport(), Transport.getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setRootUrl(options.getPubsubRootUrl())
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  private static HttpRequestInitializer chainHttpRequestInitializer(
+      Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+    if (credential == null) {
+      return new ChainingHttpRequestInitializer(
+          new NullCredentialInitializer(), httpRequestInitializer);
+    } else {
+      return new ChainingHttpRequestInitializer(
+          new HttpCredentialsAdapter(credential),
+          httpRequestInitializer);
+    }
+  }
+
   private void setupBigQueryTable(String projectId, String datasetId, String tableId,
       TableSchema schema) throws IOException {
     if (bigQueryClient == null) {
-      bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+      bigQueryClient = newBigQueryClient(options.as(BigQueryOptions.class)).build();
     }
 
     Datasets datasetService = bigQueryClient.datasets();
@@ -224,7 +270,7 @@ public class ExampleUtils {
 
   private void setupPubsubTopic(String topic) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+      pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
       pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
@@ -233,7 +279,7 @@ public class ExampleUtils {
 
   private void setupPubsubSubscription(String topic, String subscription) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+      pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
       Subscription subInfo = new Subscription()
@@ -250,7 +296,7 @@ public class ExampleUtils {
    */
   private void deletePubsubTopic(String topic) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+      pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
       pubsubClient.projects().topics().delete(topic).execute();
@@ -264,7 +310,7 @@ public class ExampleUtils {
    */
   private void deletePubsubSubscription(String subscription) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+      pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
       pubsubClient.projects().subscriptions().delete(subscription).execute();

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
index 15c261f..9dea55a 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -18,9 +18,9 @@
 
 package org.apache.beam.examples.cookbook;
 
-import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.BigqueryMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aec434e..dcca93b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -407,6 +407,13 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+        <classifier>tests</classifier>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 8689c3e..1c3891e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -20,8 +20,9 @@ package org.apache.beam.runners.dataflow.options;
 import java.io.IOException;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
@@ -29,7 +30,6 @@ import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.util.IOChannelUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/pom.xml b/sdks/java/extensions/gcp-core/pom.xml
index d566f94..20bd62a 100644
--- a/sdks/java/extensions/gcp-core/pom.xml
+++ b/sdks/java/extensions/gcp-core/pom.xml
@@ -109,16 +109,6 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-pubsub</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-bigquery</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-storage</artifactId>
     </dependency>
 
@@ -190,18 +180,6 @@
     </dependency>
 
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
@@ -210,8 +188,13 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>
-
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
deleted file mode 100644
index 7672cd7..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties needed when using Google BigQuery with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google BigQuery. See "
-    + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-  @Description("Temporary dataset for BigQuery table operations. "
-      + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
-  @Default.String("bigquery.googleapis.com/cloud_dataflow")
-  String getTempDatasetId();
-  void setTempDatasetId(String value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
index 00be440..411121c 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
@@ -29,11 +29,9 @@ public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
   @Override
   public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
     return ImmutableList.<Class<? extends PipelineOptions>>builder()
-        .add(BigQueryOptions.class)
         .add(GcpOptions.class)
         .add(GcsOptions.class)
         .add(GoogleApiDebugOptions.class)
-        .add(PubsubOptions.class)
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
deleted file mode 100644
index b065d19..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google Cloud Pub/Sub. See "
-    + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.")
-public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-
-  /**
-   * Root URL for use with the Google Cloud Pub/Sub API.
-   */
-  @Description("Root URL for use with the Google Cloud Pub/Sub API")
-  @Default.String("https://pubsub.googleapis.com")
-  @Hidden
-  String getPubsubRootUrl();
-  void setPubsubRootUrl(String value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
deleted file mode 100644
index 8f752c0..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.BigqueryScopes;
-import com.google.api.services.bigquery.model.QueryRequest;
-import com.google.api.services.bigquery.model.QueryResponse;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Transport;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A matcher to verify data in BigQuery by processing given query
- * and comparing with content's checksum.
- *
- * <p>Example:
- * <pre>{@code [
- *   assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum));
- * ]}</pre>
- */
-@NotThreadSafe
-public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
-    implements SerializableMatcher<PipelineResult> {
-  private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class);
-
-  // The maximum number of retries to execute a BigQuery RPC
-  static final int MAX_QUERY_RETRIES = 4;
-
-  // The initial backoff for executing a BigQuery RPC
-  private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L);
-
-  // The total number of rows in query response to be formatted for debugging purpose
-  private static final int TOTAL_FORMATTED_ROWS = 20;
-
-  // The backoff factory with initial configs
-  static final FluentBackoff BACKOFF_FACTORY =
-      FluentBackoff.DEFAULT
-          .withMaxRetries(MAX_QUERY_RETRIES)
-          .withInitialBackoff(INITIAL_BACKOFF);
-
-  private final String applicationName;
-  private final String projectId;
-  private final String query;
-  private final String expectedChecksum;
-  private String actualChecksum;
-  private transient QueryResponse response;
-
-  public BigqueryMatcher(
-      String applicationName, String projectId, String query, String expectedChecksum) {
-    validateArgument("applicationName", applicationName);
-    validateArgument("projectId", projectId);
-    validateArgument("query", query);
-    validateArgument("expectedChecksum", expectedChecksum);
-
-    this.applicationName = applicationName;
-    this.projectId = projectId;
-    this.query = query;
-    this.expectedChecksum = expectedChecksum;
-  }
-
-  @Override
-  protected boolean matchesSafely(PipelineResult pipelineResult) {
-    LOG.info("Verifying Bigquery data");
-    Bigquery bigqueryClient = newBigqueryClient(applicationName);
-
-    // execute query
-    LOG.debug("Executing query: {}", query);
-    try {
-      QueryRequest queryContent = new QueryRequest();
-      queryContent.setQuery(query);
-
-      response = queryWithRetries(
-          bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
-    } catch (IOException | InterruptedException e) {
-      if (e instanceof InterruptedIOException) {
-        Thread.currentThread().interrupt();
-      }
-      throw new RuntimeException("Failed to fetch BigQuery data.", e);
-    }
-
-    if (!response.getJobComplete()) {
-      // query job not complete, verification failed
-      return false;
-    } else {
-      // compute checksum
-      actualChecksum = generateHash(response.getRows());
-      LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
-
-      return expectedChecksum.equals(actualChecksum);
-    }
-  }
-
-  @VisibleForTesting
-  Bigquery newBigqueryClient(String applicationName) {
-    HttpTransport transport = Transport.getTransport();
-    JsonFactory jsonFactory = Transport.getJsonFactory();
-    Credentials credential = getDefaultCredential();
-
-    return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential))
-        .setApplicationName(applicationName)
-        .build();
-  }
-
-  @Nonnull
-  @VisibleForTesting
-  QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
-                                 Sleeper sleeper, BackOff backOff)
-      throws IOException, InterruptedException {
-    IOException lastException = null;
-    do {
-      if (lastException != null) {
-        LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
-      }
-      try {
-        QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
-        if (response != null) {
-          return response;
-        } else {
-          lastException =
-              new IOException("Expected valid response from query job, but received null.");
-        }
-      } catch (IOException e) {
-        // ignore and retry
-        lastException = e;
-      }
-    } while(BackOffUtils.next(sleeper, backOff));
-
-    throw new RuntimeException(
-        String.format(
-            "Unable to get BigQuery response after retrying %d times using query (%s)",
-            MAX_QUERY_RETRIES,
-            queryContent.getQuery()),
-        lastException);
-  }
-
-  private void validateArgument(String name, String value) {
-    checkArgument(
-        !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value);
-  }
-
-  private Credentials getDefaultCredential() {
-    GoogleCredentials credential;
-    try {
-      credential = GoogleCredentials.getApplicationDefault();
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to get application default credential.", e);
-    }
-
-    if (credential.createScopedRequired()) {
-      Collection<String> bigqueryScope =
-          Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY);
-      credential = credential.createScoped(bigqueryScope);
-    }
-    return credential;
-  }
-
-  private String generateHash(@Nonnull List<TableRow> rows) {
-    List<HashCode> rowHashes = Lists.newArrayList();
-    for (TableRow row : rows) {
-      List<String> cellsInOneRow = Lists.newArrayList();
-      for (TableCell cell : row.getF()) {
-        cellsInOneRow.add(Objects.toString(cell.getV()));
-        Collections.sort(cellsInOneRow);
-      }
-      rowHashes.add(
-          Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8));
-    }
-    return Hashing.combineUnordered(rowHashes).toString();
-  }
-
-  @Override
-  public void describeTo(Description description) {
-    description
-        .appendText("Expected checksum is (")
-        .appendText(expectedChecksum)
-        .appendText(")");
-  }
-
-  @Override
-  public void describeMismatchSafely(PipelineResult pResult, Description description) {
-    String info;
-    if (!response.getJobComplete()) {
-      // query job not complete
-      info = String.format("The query job hasn't completed. Got response: %s", response);
-    } else {
-      // checksum mismatch
-      info = String.format("was (%s).%n"
-          + "\tTotal number of rows are: %d.%n"
-          + "\tQueried data details:%s",
-          actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS));
-    }
-    description.appendText(info);
-  }
-
-  private String formatRows(int totalNumRows) {
-    StringBuilder samples = new StringBuilder();
-    List<TableRow> rows = response.getRows();
-    for (int i = 0; i < totalNumRows && i < rows.size(); i++) {
-      samples.append(String.format("%n\t\t"));
-      for (TableCell field : rows.get(i).getF()) {
-        samples.append(String.format("%-10s", field.getV()));
-      }
-    }
-    if (rows.size() > totalNumRows) {
-      samples.append(String.format("%n\t\t..."));
-    }
-    return samples.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
deleted file mode 100644
index 1494026..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines utilities for unit testing Google Cloud Platform components of Apache Beam pipelines.
- */
-package org.apache.beam.sdk.testing;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 80c093b..93d067a 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -22,9 +22,7 @@ import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.http.HttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.storage.Storage;
 import com.google.auth.Credentials;
 import com.google.auth.http.HttpCredentialsAdapter;
@@ -34,10 +32,8 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.security.GeneralSecurityException;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.CloudResourceManagerOptions;
 import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
 
 /**
  * Helpers for cloud communication.
@@ -91,38 +87,6 @@ public class Transport {
   }
 
   /**
-   * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
-   */
-  public static Bigquery.Builder
-      newBigQueryClient(BigQueryOptions options) {
-    return new Bigquery.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
-   *
-   * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory
-   */
-  @Deprecated
-  public static Pubsub.Builder
-      newPubsubClient(PubsubOptions options) {
-    return new Pubsub.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setRootUrl(options.getPubsubRootUrl())
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
    * Returns a CloudResourceManager client builder using the specified
    * {@link CloudResourceManagerOptions}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
index 37fb42d..50edd83 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
@@ -39,9 +39,7 @@ public class GcpCoreApiSurfaceTest {
         ImmutableSet.of(
             "org.apache.beam",
             "com.google.api.client",
-            "com.google.api.services.bigquery",
             "com.google.api.services.cloudresourcemanager",
-            "com.google.api.services.pubsub",
             "com.google.api.services.storage",
             "com.google.auth",
             "com.google.protobuf",

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
index dae7208..376972e 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.bigquery.Bigquery.Datasets.Delete;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Delete;
 import com.google.api.services.storage.Storage;
 import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.util.TestCredential;
@@ -112,8 +112,9 @@ public class GoogleApiDebugOptionsTest {
         Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
     assertEquals("TraceDestination", getRequest.get("$trace"));
 
-    Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class))
-        .build().datasets().delete("testProjectId", "testDatasetId");
+    Delete deleteRequest =
+        Transport.newCloudResourceManagerClient(options.as(CloudResourceManagerOptions.class))
+            .build().projects().delete("testProjectId");
     assertNull(deleteRequest.get("$trace"));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
deleted file mode 100644
index 3b35856..0000000
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.QueryRequest;
-import com.google.api.services.bigquery.model.QueryResponse;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.math.BigInteger;
-import org.apache.beam.sdk.PipelineResult;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link BigqueryMatcher}.
- */
-@RunWith(JUnit4.class)
-public class BigqueryMatcherTest {
-  private final String appName = "test-app";
-  private final String projectId = "test-project";
-  private final String query = "test-query";
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
-  @Mock private Bigquery mockBigqueryClient;
-  @Mock private Bigquery.Jobs mockJobs;
-  @Mock private Bigquery.Jobs.Query mockQuery;
-  @Mock private PipelineResult mockResult;
-
-  @Before
-  public void setUp() throws IOException {
-    MockitoAnnotations.initMocks(this);
-    when(mockBigqueryClient.jobs()).thenReturn(mockJobs);
-    when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery);
-  }
-
-  @Test
-  public void testBigqueryMatcherThatSucceeds() throws Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(
-            appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc"));
-    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
-    when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
-
-    assertThat(mockResult, matcher);
-    verify(matcher).newBigqueryClient(eq(appName));
-    verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
-  }
-
-  @Test
-  public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "incorrect-checksum"));
-    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
-    when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
-
-    thrown.expect(AssertionError.class);
-    thrown.expectMessage("Total number of rows are: 1");
-    thrown.expectMessage("abc");
-    try {
-      assertThat(mockResult, matcher);
-    } finally {
-      verify(matcher).newBigqueryClient(eq(appName));
-      verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
-    }
-  }
-
-  @Test
-  public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
-    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
-    when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false));
-
-    thrown.expect(AssertionError.class);
-    thrown.expectMessage("The query job hasn't completed.");
-    thrown.expectMessage("jobComplete=false");
-    try {
-      assertThat(mockResult, matcher);
-    } finally {
-      verify(matcher).newBigqueryClient(eq(appName));
-      verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
-    }
-  }
-
-  @Test
-  public void testQueryWithRetriesWhenServiceFails() throws Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
-    when(mockQuery.execute()).thenThrow(new IOException());
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to get BigQuery response after retrying");
-    try {
-      matcher.queryWithRetries(
-          mockBigqueryClient,
-          new QueryRequest(),
-          fastClock,
-          BigqueryMatcher.BACKOFF_FACTORY.backoff());
-    } finally {
-      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
-          .query(eq(projectId), eq(new QueryRequest()));
-    }
-  }
-
-  @Test
-  public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
-    when(mockQuery.execute()).thenReturn(null);
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to get BigQuery response after retrying");
-    try {
-      matcher.queryWithRetries(
-          mockBigqueryClient,
-          new QueryRequest(),
-          fastClock,
-          BigqueryMatcher.BACKOFF_FACTORY.backoff());
-    } finally {
-      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
-          .query(eq(projectId), eq(new QueryRequest()));
-    }
-  }
-
-  private QueryResponse createResponseContainingTestData() {
-    TableCell field1 = new TableCell();
-    field1.setV("abc");
-    TableCell field2 = new TableCell();
-    field2.setV("2");
-    TableCell field3 = new TableCell();
-    field3.setV("testing BigQuery matcher.");
-    TableRow row = new TableRow();
-    row.setF(Lists.newArrayList(field1, field2, field3));
-
-    QueryResponse response = new QueryResponse();
-    response.setJobComplete(true);
-    response.setRows(Lists.newArrayList(row));
-    response.setTotalRows(BigInteger.ONE);
-    return response;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml
index 9a54254..9300fc7 100644
--- a/sdks/java/extensions/protobuf/pom.xml
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -115,18 +115,6 @@
     </dependency>
 
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
@@ -137,6 +125,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-  </dependencies>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 236b234..38a0f6c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index a13d61d..7ab0d73 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSche
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
new file mode 100644
index 0000000..06bab00
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/**
+ * Properties needed when using Google BigQuery with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google BigQuery. See "
+    + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
+public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
+    PipelineOptions, StreamingOptions {
+  @Description("Temporary dataset for BigQuery table operations. "
+      + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
+  @Default.String("bigquery.googleapis.com/cloud_dataflow")
+  String getTempDatasetId();
+  void setTempDatasetId(String value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 9153157..49da030 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index ebff6c1..1ae10bc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -33,7 +33,6 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.BigQueryOptions;
 
 /** An interface for real, mock, or fake implementations of Cloud BigQuery services. */
 interface BigQueryServices extends Serializable {

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c8e6ed8..0e0c365 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.ExponentialBackOff;
@@ -43,8 +44,12 @@ import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableDataList;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -56,11 +61,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.NullCredentialInitializer;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -117,7 +123,7 @@ class BigQueryServicesImpl implements BigQueryServices {
 
     private JobServiceImpl(BigQueryOptions options) {
       this.errorExtractor = new ApiErrorExtractor();
-      this.client = Transport.newBigQueryClient(options).build();
+      this.client = newBigQueryClient(options).build();
     }
 
     /**
@@ -379,7 +385,7 @@ class BigQueryServicesImpl implements BigQueryServices {
 
     private DatasetServiceImpl(BigQueryOptions bqOptions) {
       this.errorExtractor = new ApiErrorExtractor();
-      this.client = Transport.newBigQueryClient(bqOptions).build();
+      this.client = newBigQueryClient(bqOptions).build();
       this.options = bqOptions;
       this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
       this.executor = null;
@@ -827,13 +833,13 @@ class BigQueryServicesImpl implements BigQueryServices {
         BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
       return new BigQueryJsonReaderImpl(
           BigQueryTableRowIterator.fromQuery(
-              queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build()));
+              queryConfig, projectId, newBigQueryClient(bqOptions).build()));
     }
 
     private static BigQueryJsonReader fromTable(
         BigQueryOptions bqOptions, TableReference tableRef) {
       return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
-          tableRef, Transport.newBigQueryClient(bqOptions).build()));
+          tableRef, newBigQueryClient(bqOptions).build()));
     }
 
     @Override
@@ -921,4 +927,29 @@ class BigQueryServicesImpl implements BigQueryServices {
       throw new RuntimeException(e);
     }
   }
+
+  /**
+   * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
+   */
+  private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
+    return new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  private static HttpRequestInitializer chainHttpRequestInitializer(
+      Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+    if (credential == null) {
+      return new ChainingHttpRequestInitializer(
+          new NullCredentialInitializer(), httpRequestInitializer);
+    } else {
+      return new ChainingHttpRequestInitializer(
+          new HttpCredentialsAdapter(credential),
+          httpRequestInitializer);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index ab7f4e8..c7a6cca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index a28da92..5ec8b57 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -28,7 +28,6 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index a78f32d..a377af7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
index a8bdb43..73d8eb7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -21,7 +21,6 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.base.Strings;
 import java.io.IOException;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 22b2078..2b4cd71 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SystemDoFnInternal;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 9b1c989..f575a3d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 4a6cd2b..f336849 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..2abff07
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.common;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * A registrar containing the default GCP options.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class GcpIoPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>builder()
+        .add(BigQueryOptions.class)
+        .add(PubsubOptions.class)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java
new file mode 100644
index 0000000..5459534
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines common Google Cloud Platform IO support classes. */
+package org.apache.beam.sdk.io.gcp.common;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 750178c..3a69799 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
 
 /**
  * An (abstract) helper class for talking to Pubsub via an underlying transport.

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 912d59c..1d02a1e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -62,7 +62,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
 
 /**
  * A helper class for talking to Pubsub via grpc.

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 8fc1c19..f0926d4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index e290a6b..39184fb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -47,7 +47,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java
new file mode 100644
index 0000000..6158584
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/**
+ * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google Cloud Pub/Sub. See "
+    + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.")
+public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
+    PipelineOptions, StreamingOptions {
+
+  /**
+   * Root URL for use with the Google Cloud Pub/Sub API.
+   */
+  @Description("Root URL for use with the Google Cloud Pub/Sub API")
+  @Default.String("https://pubsub.googleapis.com")
+  @Hidden
+  String getPubsubRootUrl();
+  void setPubsubRootUrl(String value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index c88576e..9d40e41 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
 
 /**
  * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index f1dc1e8..cf43ae6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 558b944..4979939 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -61,7 +61,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java
new file mode 100644
index 0000000..27b28ef
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines utilities for unit testing Google Cloud Platform components of Apache Beam pipelines.
+ */
+package org.apache.beam.sdk.io.gcp.testing;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index f468ec0..7025004 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 import com.google.common.collect.ImmutableSet;
 import java.util.Set;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
 import org.apache.beam.sdk.util.ApiSurface;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -42,6 +43,7 @@ public class GcpApiSurfaceTest {
 
     final ApiSurface apiSurface =
         ApiSurface.ofPackage(thisPackage, thisClassLoader)
+            .pruningPattern(BigqueryMatcher.class.getName())
             .pruningPattern("org[.]apache[.]beam[.].*Test.*")
             .pruningPattern("org[.]apache[.]beam[.].*IT")
             .pruningPattern("java[.]lang.*")

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index e0a5fac..a46c1fe 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -81,7 +81,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
-import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index 43ad238..367aeb7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -33,7 +33,6 @@ import java.util.NoSuchElementException;
 
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.options.BigQueryOptions;
 
 
 /**