You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/25 22:20:05 UTC
[2/3] beam git commit: [BEAM-1871] Move Bigquery/Pubsub options to
sdks/java/io/google-cloud-platform
[BEAM-1871] Move Bigquery/Pubsub options to sdks/java/io/google-cloud-platform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f9f7bcd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f9f7bcd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f9f7bcd
Branch: refs/heads/master
Commit: 6f9f7bcd914d18eee4e00bd16d69c4d53636e244
Parents: 652a919
Author: Luke Cwik <lc...@google.com>
Authored: Mon Apr 24 15:58:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Apr 25 15:19:44 2017 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 22 ++
.../beam/examples/common/ExampleUtils.java | 60 ++++-
.../examples/cookbook/BigQueryTornadoesIT.java | 4 +-
pom.xml | 7 +
.../options/DataflowPipelineOptions.java | 4 +-
sdks/java/extensions/gcp-core/pom.xml | 29 +--
.../beam/sdk/options/BigQueryOptions.java | 32 ---
.../options/GcpPipelineOptionsRegistrar.java | 2 -
.../apache/beam/sdk/options/PubsubOptions.java | 36 ---
.../beam/sdk/testing/BigqueryMatcher.java | 256 ------------------
.../apache/beam/sdk/testing/package-info.java | 21 --
.../org/apache/beam/sdk/util/Transport.java | 36 ---
.../org/apache/beam/GcpCoreApiSurfaceTest.java | 2 -
.../sdk/options/GoogleApiDebugOptionsTest.java | 7 +-
.../beam/sdk/testing/BigqueryMatcherTest.java | 176 -------------
sdks/java/extensions/protobuf/pom.xml | 19 +-
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 1 -
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1 -
.../sdk/io/gcp/bigquery/BigQueryOptions.java | 39 +++
.../io/gcp/bigquery/BigQueryQuerySource.java | 1 -
.../sdk/io/gcp/bigquery/BigQueryServices.java | 1 -
.../io/gcp/bigquery/BigQueryServicesImpl.java | 41 ++-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 1 -
.../io/gcp/bigquery/BigQueryTableSource.java | 1 -
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 1 -
.../beam/sdk/io/gcp/bigquery/PrepareWrite.java | 1 -
.../sdk/io/gcp/bigquery/StreamingWriteFn.java | 1 -
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 1 -
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 1 -
.../common/GcpIoPipelineOptionsRegistrar.java | 39 +++
.../beam/sdk/io/gcp/common/package-info.java | 20 ++
.../beam/sdk/io/gcp/pubsub/PubsubClient.java | 1 -
.../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 1 -
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 1 -
.../sdk/io/gcp/pubsub/PubsubJsonClient.java | 1 -
.../beam/sdk/io/gcp/pubsub/PubsubOptions.java | 44 ++++
.../sdk/io/gcp/pubsub/PubsubTestClient.java | 1 -
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 1 -
.../io/gcp/pubsub/PubsubUnboundedSource.java | 1 -
.../beam/sdk/io/gcp/testing/package-info.java | 21 ++
.../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 2 +
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 1 -
.../io/gcp/bigquery/FakeBigQueryServices.java | 1 -
.../sdk/io/gcp/testing/BigqueryMatcher.java | 257 +++++++++++++++++++
.../sdk/io/gcp/testing/BigqueryMatcherTest.java | 177 +++++++++++++
45 files changed, 737 insertions(+), 636 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index ae3d63d..9317136 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -485,6 +485,21 @@
</dependency>
<dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>util</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-oauth2-http</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-credentials</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
@@ -552,6 +567,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 6962571..2650f8e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
@@ -33,6 +34,10 @@ import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -42,10 +47,12 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.NullCredentialInitializer;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
@@ -196,10 +203,49 @@ public class ExampleUtils {
}
}
+ /**
+ * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
+ */
+ private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
+ return new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ /**
+ * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
+ */
+ private static Pubsub.Builder newPubsubClient(PubsubOptions options) {
+ return new Pubsub.Builder(Transport.getTransport(), Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setRootUrl(options.getPubsubRootUrl())
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ private static HttpRequestInitializer chainHttpRequestInitializer(
+ Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ if (credential == null) {
+ return new ChainingHttpRequestInitializer(
+ new NullCredentialInitializer(), httpRequestInitializer);
+ } else {
+ return new ChainingHttpRequestInitializer(
+ new HttpCredentialsAdapter(credential),
+ httpRequestInitializer);
+ }
+ }
+
private void setupBigQueryTable(String projectId, String datasetId, String tableId,
TableSchema schema) throws IOException {
if (bigQueryClient == null) {
- bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+ bigQueryClient = newBigQueryClient(options.as(BigQueryOptions.class)).build();
}
Datasets datasetService = bigQueryClient.datasets();
@@ -224,7 +270,7 @@ public class ExampleUtils {
private void setupPubsubTopic(String topic) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
@@ -233,7 +279,7 @@ public class ExampleUtils {
private void setupPubsubSubscription(String topic, String subscription) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
Subscription subInfo = new Subscription()
@@ -250,7 +296,7 @@ public class ExampleUtils {
*/
private void deletePubsubTopic(String topic) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
pubsubClient.projects().topics().delete(topic).execute();
@@ -264,7 +310,7 @@ public class ExampleUtils {
*/
private void deletePubsubSubscription(String subscription) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
pubsubClient.projects().subscriptions().delete(subscription).execute();
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
index 15c261f..9dea55a 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -18,9 +18,9 @@
package org.apache.beam.examples.cookbook;
-import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.BigqueryMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aec434e..dcca93b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -407,6 +407,13 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-common</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 8689c3e..1c3891e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -20,8 +20,9 @@ package org.apache.beam.runners.dataflow.options;
import java.io.IOException;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
@@ -29,7 +30,6 @@ import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.util.IOChannelUtils;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/pom.xml b/sdks/java/extensions/gcp-core/pom.xml
index d566f94..20bd62a 100644
--- a/sdks/java/extensions/gcp-core/pom.xml
+++ b/sdks/java/extensions/gcp-core/pom.xml
@@ -109,16 +109,6 @@
<dependency>
<groupId>com.google.apis</groupId>
- <artifactId>google-api-services-pubsub</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-bigquery</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
<artifactId>google-api-services-storage</artifactId>
</dependency>
@@ -190,18 +180,6 @@
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
@@ -210,8 +188,13 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
-
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
deleted file mode 100644
index 7672cd7..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties needed when using Google BigQuery with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google BigQuery. See "
- + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
- @Description("Temporary dataset for BigQuery table operations. "
- + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
- @Default.String("bigquery.googleapis.com/cloud_dataflow")
- String getTempDatasetId();
- void setTempDatasetId(String value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
index 00be440..411121c 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
@@ -29,11 +29,9 @@ public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>builder()
- .add(BigQueryOptions.class)
.add(GcpOptions.class)
.add(GcsOptions.class)
.add(GoogleApiDebugOptions.class)
- .add(PubsubOptions.class)
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
deleted file mode 100644
index b065d19..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google Cloud Pub/Sub. See "
- + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.")
-public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
-
- /**
- * Root URL for use with the Google Cloud Pub/Sub API.
- */
- @Description("Root URL for use with the Google Cloud Pub/Sub API")
- @Default.String("https://pubsub.googleapis.com")
- @Hidden
- String getPubsubRootUrl();
- void setPubsubRootUrl(String value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
deleted file mode 100644
index 8f752c0..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.BigqueryScopes;
-import com.google.api.services.bigquery.model.QueryRequest;
-import com.google.api.services.bigquery.model.QueryResponse;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Transport;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A matcher to verify data in BigQuery by processing given query
- * and comparing with content's checksum.
- *
- * <p>Example:
- * <pre>{@code [
- * assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum));
- * ]}</pre>
- */
-@NotThreadSafe
-public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
- implements SerializableMatcher<PipelineResult> {
- private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class);
-
- // The maximum number of retries to execute a BigQuery RPC
- static final int MAX_QUERY_RETRIES = 4;
-
- // The initial backoff for executing a BigQuery RPC
- private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L);
-
- // The total number of rows in query response to be formatted for debugging purpose
- private static final int TOTAL_FORMATTED_ROWS = 20;
-
- // The backoff factory with initial configs
- static final FluentBackoff BACKOFF_FACTORY =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_QUERY_RETRIES)
- .withInitialBackoff(INITIAL_BACKOFF);
-
- private final String applicationName;
- private final String projectId;
- private final String query;
- private final String expectedChecksum;
- private String actualChecksum;
- private transient QueryResponse response;
-
- public BigqueryMatcher(
- String applicationName, String projectId, String query, String expectedChecksum) {
- validateArgument("applicationName", applicationName);
- validateArgument("projectId", projectId);
- validateArgument("query", query);
- validateArgument("expectedChecksum", expectedChecksum);
-
- this.applicationName = applicationName;
- this.projectId = projectId;
- this.query = query;
- this.expectedChecksum = expectedChecksum;
- }
-
- @Override
- protected boolean matchesSafely(PipelineResult pipelineResult) {
- LOG.info("Verifying Bigquery data");
- Bigquery bigqueryClient = newBigqueryClient(applicationName);
-
- // execute query
- LOG.debug("Executing query: {}", query);
- try {
- QueryRequest queryContent = new QueryRequest();
- queryContent.setQuery(query);
-
- response = queryWithRetries(
- bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedIOException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException("Failed to fetch BigQuery data.", e);
- }
-
- if (!response.getJobComplete()) {
- // query job not complete, verification failed
- return false;
- } else {
- // compute checksum
- actualChecksum = generateHash(response.getRows());
- LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
-
- return expectedChecksum.equals(actualChecksum);
- }
- }
-
- @VisibleForTesting
- Bigquery newBigqueryClient(String applicationName) {
- HttpTransport transport = Transport.getTransport();
- JsonFactory jsonFactory = Transport.getJsonFactory();
- Credentials credential = getDefaultCredential();
-
- return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential))
- .setApplicationName(applicationName)
- .build();
- }
-
- @Nonnull
- @VisibleForTesting
- QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
- Sleeper sleeper, BackOff backOff)
- throws IOException, InterruptedException {
- IOException lastException = null;
- do {
- if (lastException != null) {
- LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
- }
- try {
- QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
- if (response != null) {
- return response;
- } else {
- lastException =
- new IOException("Expected valid response from query job, but received null.");
- }
- } catch (IOException e) {
- // ignore and retry
- lastException = e;
- }
- } while(BackOffUtils.next(sleeper, backOff));
-
- throw new RuntimeException(
- String.format(
- "Unable to get BigQuery response after retrying %d times using query (%s)",
- MAX_QUERY_RETRIES,
- queryContent.getQuery()),
- lastException);
- }
-
- private void validateArgument(String name, String value) {
- checkArgument(
- !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value);
- }
-
- private Credentials getDefaultCredential() {
- GoogleCredentials credential;
- try {
- credential = GoogleCredentials.getApplicationDefault();
- } catch (IOException e) {
- throw new RuntimeException("Failed to get application default credential.", e);
- }
-
- if (credential.createScopedRequired()) {
- Collection<String> bigqueryScope =
- Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY);
- credential = credential.createScoped(bigqueryScope);
- }
- return credential;
- }
-
- private String generateHash(@Nonnull List<TableRow> rows) {
- List<HashCode> rowHashes = Lists.newArrayList();
- for (TableRow row : rows) {
- List<String> cellsInOneRow = Lists.newArrayList();
- for (TableCell cell : row.getF()) {
- cellsInOneRow.add(Objects.toString(cell.getV()));
- Collections.sort(cellsInOneRow);
- }
- rowHashes.add(
- Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8));
- }
- return Hashing.combineUnordered(rowHashes).toString();
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("Expected checksum is (")
- .appendText(expectedChecksum)
- .appendText(")");
- }
-
- @Override
- public void describeMismatchSafely(PipelineResult pResult, Description description) {
- String info;
- if (!response.getJobComplete()) {
- // query job not complete
- info = String.format("The query job hasn't completed. Got response: %s", response);
- } else {
- // checksum mismatch
- info = String.format("was (%s).%n"
- + "\tTotal number of rows are: %d.%n"
- + "\tQueried data details:%s",
- actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS));
- }
- description.appendText(info);
- }
-
- private String formatRows(int totalNumRows) {
- StringBuilder samples = new StringBuilder();
- List<TableRow> rows = response.getRows();
- for (int i = 0; i < totalNumRows && i < rows.size(); i++) {
- samples.append(String.format("%n\t\t"));
- for (TableCell field : rows.get(i).getF()) {
- samples.append(String.format("%-10s", field.getV()));
- }
- }
- if (rows.size() > totalNumRows) {
- samples.append(String.format("%n\t\t..."));
- }
- return samples.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
deleted file mode 100644
index 1494026..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines utilities for unit testing Google Cloud Platform components of Apache Beam pipelines.
- */
-package org.apache.beam.sdk.testing;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 80c093b..93d067a 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -22,9 +22,7 @@ import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.storage.Storage;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
@@ -34,10 +32,8 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.CloudResourceManagerOptions;
import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
/**
* Helpers for cloud communication.
@@ -91,38 +87,6 @@ public class Transport {
}
/**
- * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
- */
- public static Bigquery.Builder
- newBigQueryClient(BigQueryOptions options) {
- return new Bigquery.Builder(getTransport(), getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
- *
- * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory
- */
- @Deprecated
- public static Pubsub.Builder
- newPubsubClient(PubsubOptions options) {
- return new Pubsub.Builder(getTransport(), getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setRootUrl(options.getPubsubRootUrl())
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
* Returns a CloudResourceManager client builder using the specified
* {@link CloudResourceManagerOptions}.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
index 37fb42d..50edd83 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
@@ -39,9 +39,7 @@ public class GcpCoreApiSurfaceTest {
ImmutableSet.of(
"org.apache.beam",
"com.google.api.client",
- "com.google.api.services.bigquery",
"com.google.api.services.cloudresourcemanager",
- "com.google.api.services.pubsub",
"com.google.api.services.storage",
"com.google.auth",
"com.google.protobuf",
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
index dae7208..376972e 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.bigquery.Bigquery.Datasets.Delete;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Delete;
import com.google.api.services.storage.Storage;
import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.util.TestCredential;
@@ -112,8 +112,9 @@ public class GoogleApiDebugOptionsTest {
Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
assertEquals("TraceDestination", getRequest.get("$trace"));
- Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class))
- .build().datasets().delete("testProjectId", "testDatasetId");
+ Delete deleteRequest =
+ Transport.newCloudResourceManagerClient(options.as(CloudResourceManagerOptions.class))
+ .build().projects().delete("testProjectId");
assertNull(deleteRequest.get("$trace"));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
deleted file mode 100644
index 3b35856..0000000
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.QueryRequest;
-import com.google.api.services.bigquery.model.QueryResponse;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.math.BigInteger;
-import org.apache.beam.sdk.PipelineResult;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link BigqueryMatcher}.
- */
-@RunWith(JUnit4.class)
-public class BigqueryMatcherTest {
- private final String appName = "test-app";
- private final String projectId = "test-project";
- private final String query = "test-query";
-
- @Rule public ExpectedException thrown = ExpectedException.none();
- @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
- @Mock private Bigquery mockBigqueryClient;
- @Mock private Bigquery.Jobs mockJobs;
- @Mock private Bigquery.Jobs.Query mockQuery;
- @Mock private PipelineResult mockResult;
-
- @Before
- public void setUp() throws IOException {
- MockitoAnnotations.initMocks(this);
- when(mockBigqueryClient.jobs()).thenReturn(mockJobs);
- when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery);
- }
-
- @Test
- public void testBigqueryMatcherThatSucceeds() throws Exception {
- BigqueryMatcher matcher = spy(
- new BigqueryMatcher(
- appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc"));
- doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
- when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
-
- assertThat(mockResult, matcher);
- verify(matcher).newBigqueryClient(eq(appName));
- verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
- }
-
- @Test
- public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException {
- BigqueryMatcher matcher = spy(
- new BigqueryMatcher(appName, projectId, query, "incorrect-checksum"));
- doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
- when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
-
- thrown.expect(AssertionError.class);
- thrown.expectMessage("Total number of rows are: 1");
- thrown.expectMessage("abc");
- try {
- assertThat(mockResult, matcher);
- } finally {
- verify(matcher).newBigqueryClient(eq(appName));
- verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
- }
- }
-
- @Test
- public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception {
- BigqueryMatcher matcher = spy(
- new BigqueryMatcher(appName, projectId, query, "some-checksum"));
- doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
- when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false));
-
- thrown.expect(AssertionError.class);
- thrown.expectMessage("The query job hasn't completed.");
- thrown.expectMessage("jobComplete=false");
- try {
- assertThat(mockResult, matcher);
- } finally {
- verify(matcher).newBigqueryClient(eq(appName));
- verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
- }
- }
-
- @Test
- public void testQueryWithRetriesWhenServiceFails() throws Exception {
- BigqueryMatcher matcher = spy(
- new BigqueryMatcher(appName, projectId, query, "some-checksum"));
- when(mockQuery.execute()).thenThrow(new IOException());
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable to get BigQuery response after retrying");
- try {
- matcher.queryWithRetries(
- mockBigqueryClient,
- new QueryRequest(),
- fastClock,
- BigqueryMatcher.BACKOFF_FACTORY.backoff());
- } finally {
- verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
- .query(eq(projectId), eq(new QueryRequest()));
- }
- }
-
- @Test
- public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
- BigqueryMatcher matcher = spy(
- new BigqueryMatcher(appName, projectId, query, "some-checksum"));
- when(mockQuery.execute()).thenReturn(null);
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable to get BigQuery response after retrying");
- try {
- matcher.queryWithRetries(
- mockBigqueryClient,
- new QueryRequest(),
- fastClock,
- BigqueryMatcher.BACKOFF_FACTORY.backoff());
- } finally {
- verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
- .query(eq(projectId), eq(new QueryRequest()));
- }
- }
-
- private QueryResponse createResponseContainingTestData() {
- TableCell field1 = new TableCell();
- field1.setV("abc");
- TableCell field2 = new TableCell();
- field2.setV("2");
- TableCell field3 = new TableCell();
- field3.setV("testing BigQuery matcher.");
- TableRow row = new TableRow();
- row.setF(Lists.newArrayList(field1, field2, field3));
-
- QueryResponse response = new QueryResponse();
- response.setJobComplete(true);
- response.setRows(Lists.newArrayList(row));
- response.setTotalRows(BigInteger.ONE);
- return response;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml
index 9a54254..9300fc7 100644
--- a/sdks/java/extensions/protobuf/pom.xml
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -115,18 +115,6 @@
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
@@ -137,6 +125,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 236b234..38a0f6c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index a13d61d..7ab0d73 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSche
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
new file mode 100644
index 0000000..06bab00
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/**
+ * Properties needed when using Google BigQuery with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google BigQuery. See "
+ + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
+public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
+ PipelineOptions, StreamingOptions {
+ @Description("Temporary dataset for BigQuery table operations. "
+ + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
+ @Default.String("bigquery.googleapis.com/cloud_dataflow")
+ String getTempDatasetId();
+ void setTempDatasetId(String value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 9153157..49da030 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index ebff6c1..1ae10bc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -33,7 +33,6 @@ import java.io.Serializable;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.BigQueryOptions;
/** An interface for real, mock, or fake implementations of Cloud BigQuery services. */
interface BigQueryServices extends Serializable {
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c8e6ed8..0e0c365 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.ExponentialBackOff;
@@ -43,8 +44,12 @@ import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -56,11 +61,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.NullCredentialInitializer;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -117,7 +123,7 @@ class BigQueryServicesImpl implements BigQueryServices {
private JobServiceImpl(BigQueryOptions options) {
this.errorExtractor = new ApiErrorExtractor();
- this.client = Transport.newBigQueryClient(options).build();
+ this.client = newBigQueryClient(options).build();
}
/**
@@ -379,7 +385,7 @@ class BigQueryServicesImpl implements BigQueryServices {
private DatasetServiceImpl(BigQueryOptions bqOptions) {
this.errorExtractor = new ApiErrorExtractor();
- this.client = Transport.newBigQueryClient(bqOptions).build();
+ this.client = newBigQueryClient(bqOptions).build();
this.options = bqOptions;
this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
this.executor = null;
@@ -827,13 +833,13 @@ class BigQueryServicesImpl implements BigQueryServices {
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
return new BigQueryJsonReaderImpl(
BigQueryTableRowIterator.fromQuery(
- queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build()));
+ queryConfig, projectId, newBigQueryClient(bqOptions).build()));
}
private static BigQueryJsonReader fromTable(
BigQueryOptions bqOptions, TableReference tableRef) {
return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
- tableRef, Transport.newBigQueryClient(bqOptions).build()));
+ tableRef, newBigQueryClient(bqOptions).build()));
}
@Override
@@ -921,4 +927,29 @@ class BigQueryServicesImpl implements BigQueryServices {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
+ */
+ private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
+ return new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ private static HttpRequestInitializer chainHttpRequestInitializer(
+ Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ if (credential == null) {
+ return new ChainingHttpRequestInitializer(
+ new NullCredentialInitializer(), httpRequestInitializer);
+ } else {
+ return new ChainingHttpRequestInitializer(
+ new HttpCredentialsAdapter(credential),
+ httpRequestInitializer);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index ab7f4e8..c7a6cca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index a28da92..5ec8b57 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -28,7 +28,6 @@ import com.google.common.base.Strings;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index a78f32d..a377af7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
index a8bdb43..73d8eb7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -21,7 +21,6 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.Strings;
import java.io.IOException;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 22b2078..2b4cd71 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SystemDoFnInternal;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 9b1c989..f575a3d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionView;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 4a6cd2b..f336849 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..2abff07
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.common;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * A registrar containing the default GCP options.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class GcpIoPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>builder()
+ .add(BigQueryOptions.class)
+ .add(PubsubOptions.class)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java
new file mode 100644
index 0000000..5459534
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines common Google Cloud Platform IO support classes. */
+package org.apache.beam.sdk.io.gcp.common;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 750178c..3a69799 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
/**
* An (abstract) helper class for talking to Pubsub via an underlying transport.
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 912d59c..1d02a1e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -62,7 +62,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
/**
* A helper class for talking to Pubsub via grpc.
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 8fc1c19..f0926d4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index e290a6b..39184fb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -47,7 +47,6 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java
new file mode 100644
index 0000000..6158584
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/**
+ * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google Cloud Pub/Sub. See "
+ + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.")
+public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
+ PipelineOptions, StreamingOptions {
+
+ /**
+ * Root URL for use with the Google Cloud Pub/Sub API.
+ */
+ @Description("Root URL for use with the Google Cloud Pub/Sub API")
+ @Default.String("https://pubsub.googleapis.com")
+ @Hidden
+ String getPubsubRootUrl();
+ void setPubsubRootUrl(String value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index c88576e..9d40e41 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
/**
* A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index f1dc1e8..cf43ae6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 558b944..4979939 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -61,7 +61,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java
new file mode 100644
index 0000000..27b28ef
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines utilities for unit testing Google Cloud Platform components of Apache Beam pipelines.
+ */
+package org.apache.beam.sdk.io.gcp.testing;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index f468ec0..7025004 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
import org.apache.beam.sdk.util.ApiSurface;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@@ -42,6 +43,7 @@ public class GcpApiSurfaceTest {
final ApiSurface apiSurface =
ApiSurface.ofPackage(thisPackage, thisClassLoader)
+ .pruningPattern(BigqueryMatcher.class.getName())
.pruningPattern("org[.]apache[.]beam[.].*Test.*")
.pruningPattern("org[.]apache[.]beam[.].*IT")
.pruningPattern("java[.]lang.*")
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index e0a5fac..a46c1fe 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -81,7 +81,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
-import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index 43ad238..367aeb7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -33,7 +33,6 @@ import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.options.BigQueryOptions;
/**