You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/27 17:51:30 UTC
[3/4] beam git commit: [BEAM-1871] Move several options/auth classes
around in gcp-core
[BEAM-1871] Move several options/auth classes around in gcp-core
Note that I kept a duplicate of GcsOptions because of its direct usage within the Dataflow Worker
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0aed801a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0aed801a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0aed801a
Branch: refs/heads/master
Commit: 0aed801acb1dd0709d4dc0dc9ad9b94ca1c882d2
Parents: 0c740f4
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 18:23:23 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Apr 27 10:50:45 2017 -0700
----------------------------------------------------------------------
.../common/ExampleBigQueryTableOptions.java | 2 +-
...xamplePubsubTopicAndSubscriptionOptions.java | 2 +-
.../common/ExamplePubsubTopicOptions.java | 2 +-
.../beam/examples/common/ExampleUtils.java | 2 +-
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../complete/game/utils/WriteToBigQuery.java | 2 +-
.../examples/MinimalWordCountJava8Test.java | 2 +-
pom.xml | 7 +
runners/google-cloud-dataflow-java/pom.xml | 7 +
.../options/DataflowPipelineOptions.java | 4 +-
.../dataflow/util/DataflowTransport.java | 2 +-
.../BatchStatefulParDoOverridesTest.java | 2 +-
.../runners/dataflow/DataflowMetricsTest.java | 2 +-
.../dataflow/DataflowPipelineJobTest.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 4 +-
.../testing/TestDataflowRunnerTest.java | 2 +-
.../dataflow/util/MonitoringUtilTest.java | 2 +-
.../runners/dataflow/util/PackageUtilTest.java | 2 +-
.../extensions/gcp/auth/CredentialFactory.java | 29 +++
.../gcp/auth/GcpCredentialFactory.java | 67 ++++++
.../gcp/auth/NoopCredentialFactory.java | 68 ++++++
.../gcp/auth/NullCredentialInitializer.java | 62 +++++
.../sdk/extensions/gcp/auth/package-info.java | 22 ++
.../options/CloudResourceManagerOptions.java | 46 ++++
.../sdk/extensions/gcp/options/GcpOptions.java | 231 +++++++++++++++++++
.../options/GcpPipelineOptionsRegistrar.java | 39 ++++
.../sdk/extensions/gcp/options/GcsOptions.java | 160 +++++++++++++
.../gcp/options/GoogleApiDebugOptions.java | 89 +++++++
.../extensions/gcp/options/package-info.java | 22 ++
.../options/CloudResourceManagerOptions.java | 40 ----
.../org/apache/beam/sdk/options/GcpOptions.java | 227 ------------------
.../options/GcpPipelineOptionsRegistrar.java | 37 ---
.../org/apache/beam/sdk/options/GcsOptions.java | 56 +----
.../beam/sdk/options/GoogleApiDebugOptions.java | 87 -------
.../apache/beam/sdk/util/CredentialFactory.java | 29 ---
.../org/apache/beam/sdk/util/DefaultBucket.java | 4 +-
.../beam/sdk/util/GcpCredentialFactory.java | 67 ------
.../apache/beam/sdk/util/GcpProjectUtil.java | 2 +-
.../beam/sdk/util/GcsIOChannelFactory.java | 2 +-
.../apache/beam/sdk/util/GcsPathValidator.java | 2 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 3 +-
.../beam/sdk/util/NoopCredentialFactory.java | 68 ------
.../sdk/util/NullCredentialInitializer.java | 62 -----
.../apache/beam/sdk/util/TestCredential.java | 59 -----
.../org/apache/beam/sdk/util/Transport.java | 5 +-
.../org/apache/beam/GcpCoreApiSurfaceTest.java | 60 -----
.../extensions/gcp/GcpCoreApiSurfaceTest.java | 60 +++++
.../sdk/extensions/gcp/auth/TestCredential.java | 59 +++++
.../extensions/gcp/options/GcpOptionsTest.java | 172 ++++++++++++++
.../gcp/options/GoogleApiDebugOptionsTest.java | 147 ++++++++++++
.../apache/beam/sdk/options/GcpOptionsTest.java | 171 --------------
.../sdk/options/GoogleApiDebugOptionsTest.java | 146 ------------
.../apache/beam/sdk/util/DefaultBucketTest.java | 6 +-
.../beam/sdk/util/GcpProjectUtilTest.java | 3 +-
.../beam/sdk/util/GcsIOChannelFactoryTest.java | 2 +-
.../beam/sdk/util/GcsPathValidatorTest.java | 3 +-
.../org/apache/beam/sdk/util/GcsUtilTest.java | 3 +-
.../org/apache/beam/fn/harness/FnHarness.java | 2 +-
.../fn/harness/logging/BeamFnLoggingClient.java | 2 +-
.../harness/stream/StreamObserverFactory.java | 2 +-
.../apache/beam/fn/harness/FnHarnessTest.java | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 7 +
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +-
.../sdk/io/gcp/bigquery/BigQueryOptions.java | 2 +-
.../io/gcp/bigquery/BigQueryServicesImpl.java | 4 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +-
.../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 2 +-
.../beam/sdk/io/gcp/pubsub/PubsubOptions.java | 2 +-
.../beam/sdk/io/gcp/storage/GcsFileSystem.java | 2 +-
.../io/gcp/storage/GcsFileSystemRegistrar.java | 2 +-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 4 +-
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 2 +-
.../beam/sdk/io/gcp/datastore/V1ReadIT.java | 2 +-
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 2 +-
.../beam/sdk/io/gcp/datastore/V1WriteIT.java | 2 +-
.../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 2 +-
.../sdk/io/gcp/storage/GcsFileSystemTest.java | 2 +-
79 files changed, 1360 insertions(+), 1160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
index 54502cb..c5216e6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
@@ -18,10 +18,10 @@
package org.apache.beam.examples.common;
import com.google.api.services.bigquery.model.TableSchema;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
index c64681c..6cffad2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -17,10 +17,10 @@
*/
package org.apache.beam.examples.common;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
index 65594d7..b594a66 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
@@ -17,10 +17,10 @@
*/
package org.apache.beam.examples.common;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 2650f8e..6ac37fd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -47,11 +47,11 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.NullCredentialInitializer;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index f7904d3..2e1be90 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -28,10 +28,10 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 5eecddb..f767d21 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -27,10 +27,10 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 6c66d8f..f3becf9 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -26,8 +26,8 @@ import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 52d2b40..f96c14d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -372,6 +372,13 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-sorter</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 75aac43..e95f4fc 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -377,6 +377,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 1c3891e..0796b6d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -20,14 +20,14 @@ package org.apache.beam.runners.dataflow.options;
import java.io.IOException;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
index 7f3b6c7..b28b8d3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
@@ -30,7 +30,7 @@ import com.google.common.collect.ImmutableList;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.util.NullCredentialInitializer;
+import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index f995ff3..ce7f678 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -38,13 +38,13 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateSpecs;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index ddb719c..aabdd84 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -39,10 +39,10 @@ import java.io.IOException;
import java.math.BigDecimal;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 9dd2ab1..e1235b9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -45,13 +45,13 @@ import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 8185623..cf0cae4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -80,7 +81,6 @@ import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.util.state.StateSpec;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 36704bc..433fb77 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -66,6 +66,8 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.TextIO.Read;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -79,10 +81,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.NoopCredentialFactory;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 307393c..80fbfe5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
@@ -60,7 +61,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.BaseMatcher;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index 24b6c4e..c048776 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -31,9 +31,9 @@ import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler;
import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.util.TestCredential;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.chrono.ISOChronology;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index a11872f..877832c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -69,7 +69,7 @@ import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
-import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
new file mode 100644
index 0000000..6ab7b14
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ */
+public interface CredentialFactory {
+ Credentials getCredential() throws IOException, GeneralSecurityException;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
new file mode 100644
index 0000000..f999c63
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ * Returns a GCP credential.
+ */
+public class GcpCredentialFactory implements CredentialFactory {
+ /**
+ * The scope cloud-platform provides access to all Cloud Platform resources.
+ * cloud-platform isn't sufficient yet for talking to datastore so we request
+ * those resources separately.
+ *
+ * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for
+ * services we access directly (GCS) as opposed to through the backend
+ * (BigQuery, GCE), we need to explicitly request that scope.
+ */
+ private static final List<String> SCOPES = Arrays.asList(
+ "https://www.googleapis.com/auth/cloud-platform",
+ "https://www.googleapis.com/auth/devstorage.full_control",
+ "https://www.googleapis.com/auth/userinfo.email",
+ "https://www.googleapis.com/auth/datastore",
+ "https://www.googleapis.com/auth/pubsub");
+
+ private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory();
+
+ public static GcpCredentialFactory fromOptions(PipelineOptions options) {
+ return INSTANCE;
+ }
+
+ /**
+ * Returns a default GCP {@link Credentials} or null when it fails.
+ */
+ @Override
+ public Credentials getCredential() {
+ try {
+ return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
+ } catch (IOException e) {
+ // Ignore the exception
+ // Pipelines that only access to public data should be able to run without credentials.
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java
new file mode 100644
index 0000000..4355a10
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ * Always returns a null Credential object.
+ */
+public class NoopCredentialFactory implements CredentialFactory {
+ private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory();
+ private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials();
+
+ public static NoopCredentialFactory fromOptions(PipelineOptions options) {
+ return INSTANCE;
+ }
+
+ @Override
+ public Credentials getCredential() throws IOException {
+ return NOOP_CREDENTIALS;
+ }
+
+ private static class NoopCredentials extends Credentials {
+ @Override
+ public String getAuthenticationType() {
+ return null;
+ }
+
+ @Override
+ public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean hasRequestMetadata() {
+ return false;
+ }
+
+ @Override
+ public boolean hasRequestMetadataOnly() {
+ return false;
+ }
+
+ @Override
+ public void refresh() throws IOException {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java
new file mode 100644
index 0000000..00306f2
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import java.io.IOException;
+
+/**
+ * A {@link HttpRequestInitializer} for requests that don't have credentials.
+ *
+ * <p>When the access is denied, it throws {@link IOException} with a detailed error message.
+ */
+public class NullCredentialInitializer implements HttpRequestInitializer {
+ private static final int ACCESS_DENIED = 401;
+ private static final String NULL_CREDENTIAL_REASON =
+ "Unable to get application default credentials. Please see "
+ + "https://developers.google.com/accounts/docs/application-default-credentials "
+ + "for details on how to specify credentials. This version of the SDK is "
+ + "dependent on the gcloud core component version 2015.02.05 or newer to "
+ + "be able to get credentials from the currently authorized user via gcloud auth.";
+
+ @Override
+ public void initialize(HttpRequest httpRequest) throws IOException {
+ httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler());
+ }
+
+ private static class NullCredentialHttpUnsuccessfulResponseHandler
+ implements HttpUnsuccessfulResponseHandler {
+
+ @Override
+ public boolean handleResponse(
+ HttpRequest httpRequest,
+ HttpResponse httpResponse, boolean supportsRetry) throws IOException {
+ if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) {
+ throwNullCredentialException();
+ }
+ return supportsRetry;
+ }
+ }
+
+ public static void throwNullCredentialException() {
+ throw new RuntimeException(NULL_CREDENTIAL_REASON);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java
new file mode 100644
index 0000000..3d77bf2
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines classes related to interacting with {@link com.google.auth.Credentials} for
+ * pipeline creation and execution containing Google Cloud Platform components.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
new file mode 100644
index 0000000..68432cf
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.util.GcpProjectUtil;
+
+/**
+ * Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google CloudResourceManager. See "
+ + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
+public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
+ PipelineOptions, StreamingOptions {
+ /**
+ * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
+ */
+ @JsonIgnore
+ @Description("The GcpProjectUtil instance that should be used to communicate"
+ + " with Google Cloud Resource Manager.")
+ @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
+ @Hidden
+ GcpProjectUtil getGcpProjectUtil();
+ void setGcpProjectUtil(GcpProjectUtil value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
new file mode 100644
index 0000000..09904b6
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.auth.Credentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.DefaultBucket;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Options used to configure Google Cloud Platform specific options such as the project
+ * and credentials.
+ *
+ * <p>These options defer to the
+ * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
+ * application default credentials</a> for authentication. See the
+ * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for
+ * alternative mechanisms for creating credentials.
+ */
+@Description("Options used to configure Google Cloud Platform project and credentials.")
+public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
+ /**
+ * Project id to use when launching jobs.
+ */
+ @Description("Project id. Required when using Google Cloud Platform services. "
+ + "See https://cloud.google.com/storage/docs/projects for further details.")
+ @Default.InstanceFactory(DefaultProjectFactory.class)
+ String getProject();
+ void setProject(String value);
+
+ /**
+ * GCP <a href="https://developers.google.com/compute/docs/zones"
+ * >availability zone</a> for operations.
+ *
+ * <p>Default is set on a per-service basis.
+ */
+ @Description("GCP availability zone for running GCP operations. "
+ + "Default is up to the individual service.")
+ String getZone();
+ void setZone(String value);
+
+ /**
+ * The class of the credential factory that should be created and used to create
+ * credentials. If gcpCredential has not been set explicitly, an instance of this class will
+ * be constructed and used as a credential factory.
+ */
+ @Description("The class of the credential factory that should be created and used to create "
+ + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
+ + "be constructed and used as a credential factory.")
+ @Default.Class(GcpCredentialFactory.class)
+ Class<? extends CredentialFactory> getCredentialFactoryClass();
+ void setCredentialFactoryClass(
+ Class<? extends CredentialFactory> credentialFactoryClass);
+
+ /**
+ * The credential instance that should be used to authenticate against GCP services.
+ * If no credential has been set explicitly, the default is to use the instance factory
+ * that constructs a credential based upon the currently set credentialFactoryClass.
+ */
+ @JsonIgnore
+ @Description("The credential instance that should be used to authenticate against GCP services. "
+ + "If no credential has been set explicitly, the default is to use the instance factory "
+ + "that constructs a credential based upon the currently set credentialFactoryClass.")
+ @Default.InstanceFactory(GcpUserCredentialsFactory.class)
+ Credentials getGcpCredential();
+ void setGcpCredential(Credentials value);
+
+ /**
+ * Attempts to infer the default project based upon the environment this application
+ * is executing within. Currently this only supports getting the default project from gcloud.
+ */
+ class DefaultProjectFactory implements DefaultValueFactory<String> {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
+
+ @Override
+ public String create(PipelineOptions options) {
+ try {
+ File configFile;
+ if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
+ configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
+ } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
+ configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
+ } else {
+ // New versions of gcloud use this file
+ configFile = new File(
+ System.getProperty("user.home"),
+ ".config/gcloud/configurations/config_default");
+ if (!configFile.exists()) {
+ // Old versions of gcloud use this file
+ configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
+ }
+ }
+ String section = null;
+ Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
+ Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
+ for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
+ line = line.trim();
+ if (line.isEmpty() || line.startsWith(";")) {
+ continue;
+ }
+ Matcher matcher = sectionPattern.matcher(line);
+ if (matcher.matches()) {
+ section = matcher.group(1);
+ } else if (section == null || section.equals("core")) {
+ matcher = projectPattern.matcher(line);
+ if (matcher.matches()) {
+ String project = matcher.group(1).trim();
+ LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
+ + "project, please cancel this Pipeline and specify the command-line "
+ + "argument --project.", project);
+ return project;
+ }
+ }
+ }
+ } catch (IOException expected) {
+ LOG.debug("Failed to find default project.", expected);
+ }
+ // return null if can't determine
+ return null;
+ }
+
+ /**
+ * Returns true if running on the Windows OS.
+ */
+ private static boolean isWindows() {
+ return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
+ }
+
+ /**
+ * Used to mock out getting environment variables.
+ */
+ @VisibleForTesting
+ Map<String, String> getEnvironment() {
+ return System.getenv();
+ }
+ }
+
+ /**
+ * Attempts to load the GCP credentials. See
+ * {@link CredentialFactory#getCredential()} for more details.
+ */
+ class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> {
+ @Override
+ public Credentials create(PipelineOptions options) {
+ GcpOptions gcpOptions = options.as(GcpOptions.class);
+ try {
+ CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
+ .fromClass(gcpOptions.getCredentialFactoryClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ return factory.getCredential();
+ } catch (IOException | GeneralSecurityException e) {
+ throw new RuntimeException("Unable to obtain credential", e);
+ }
+ }
+ }
+
+ /**
+ * A GCS path for storing temporary files in GCP.
+ *
+ * <p>Its default to {@link PipelineOptions#getTempLocation}.
+ */
+ @Description("A GCS path for storing temporary files in GCP.")
+ @Default.InstanceFactory(GcpTempLocationFactory.class)
+ @Nullable String getGcpTempLocation();
+ void setGcpTempLocation(String value);
+
+ /**
+ * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
+ */
+ class GcpTempLocationFactory implements DefaultValueFactory<String> {
+
+ @Override
+ @Nullable
+ public String create(PipelineOptions options) {
+ String tempLocation = options.getTempLocation();
+ if (isNullOrEmpty(tempLocation)) {
+ tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
+ options.setTempLocation(tempLocation);
+ } else {
+ try {
+ PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+ validator.validateOutputFilePrefixSupported(tempLocation);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path, %s. ", tempLocation), e);
+ }
+ }
+ return tempLocation;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..afc3416
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * A registrar containing the default GCP options.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>builder()
+ .add(GcpOptions.class)
+ .add(GcsOptions.class)
+ .add(GoogleApiDebugOptions.class)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
new file mode 100644
index 0000000..954092c
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.GcsPathValidator;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+
+/**
+ * Options used to configure Google Cloud Storage.
+ */
+public interface GcsOptions extends
+ ApplicationNameOptions, GcpOptions, PipelineOptions {
+ /**
+ * The GcsUtil instance that should be used to communicate with Google Cloud Storage.
+ */
+ @JsonIgnore
+ @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.")
+ @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class)
+ @Hidden
+ GcsUtil getGcsUtil();
+ void setGcsUtil(GcsUtil value);
+
+ /**
+ * The ExecutorService instance to use to create threads, can be overridden to specify an
+ * ExecutorService that is compatible with the users environment. If unset, the
+ * default is to create an ExecutorService with an unbounded number of threads; this
+ * is compatible with Google AppEngine.
+ */
+ @JsonIgnore
+ @Description("The ExecutorService instance to use to create multiple threads. Can be overridden "
+ + "to specify an ExecutorService that is compatible with the users environment. If unset, "
+ + "the default is to create an ExecutorService with an unbounded number of threads; this "
+ + "is compatible with Google AppEngine.")
+ @Default.InstanceFactory(ExecutorServiceFactory.class)
+ @Hidden
+ ExecutorService getExecutorService();
+ void setExecutorService(ExecutorService value);
+
+ /**
+ * GCS endpoint to use. If unspecified, uses the default endpoint.
+ */
+ @JsonIgnore
+ @Hidden
+ @Description("The URL for the GCS API.")
+ String getGcsEndpoint();
+ void setGcsEndpoint(String value);
+
+ /**
+ * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
+ * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
+ * restrictions and performance implications of this value.
+ */
+ @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the "
+ + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
+ + "information on the restrictions and performance implications of this value.\n\n"
+ + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
+ + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
+ @Nullable
+ Integer getGcsUploadBufferSizeBytes();
+ void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
+
+ /**
+ * The class of the validator that should be created and used to validate paths.
+ * If pathValidator has not been set explicitly, an instance of this class will be
+ * constructed and used as the path validator.
+ */
+ @Description("The class of the validator that should be created and used to validate paths. "
+ + "If pathValidator has not been set explicitly, an instance of this class will be "
+ + "constructed and used as the path validator.")
+ @Default.Class(GcsPathValidator.class)
+ Class<? extends PathValidator> getPathValidatorClass();
+ void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+ /**
+ * The path validator instance that should be used to validate paths.
+ * If no path validator has been set explicitly, the default is to use the instance factory that
+ * constructs a path validator based upon the currently set pathValidatorClass.
+ */
+ @JsonIgnore
+ @Description("The path validator instance that should be used to validate paths. "
+ + "If no path validator has been set explicitly, the default is to use the instance factory "
+ + "that constructs a path validator based upon the currently set pathValidatorClass.")
+ @Default.InstanceFactory(PathValidatorFactory.class)
+ PathValidator getPathValidator();
+ void setPathValidator(PathValidator validator);
+
+ /**
+ * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The
+ * {@link ExecutorService} is compatible with AppEngine.
+ */
+ class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
+ @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only.
+ @Override
+ public ExecutorService create(PipelineOptions options) {
+ ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+ threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+ threadFactoryBuilder.setDaemon(true);
+ /* The SDK requires an unbounded thread pool because a step may create X writers
+ * each requiring their own thread to perform the writes otherwise a writer may
+ * block causing deadlock for the step because the writers buffer is full.
+ * Also, the MapTaskExecutor launches the steps in reverse order and completes
+ * them in forward order thus requiring enough threads so that each step's writers
+ * can be active.
+ */
+ return new ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
+ Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
+ new SynchronousQueue<Runnable>(),
+ threadFactoryBuilder.build());
+ }
+ }
+
+ /**
+ * Creates a {@link PathValidator} object using the class specified in
+ * {@link #getPathValidatorClass()}.
+ */
+ class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+ @Override
+ public PathValidator create(PipelineOptions options) {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ return InstanceBuilder.ofType(PathValidator.class)
+ .fromClass(gcsOptions.getPathValidatorClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java
new file mode 100644
index 0000000..01144c4
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import com.google.api.client.googleapis.services.AbstractGoogleClient;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.googleapis.services.GoogleClientRequestInitializer;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * These options configure debug settings for Google API clients created within the Apache Beam SDK.
+ */
+public interface GoogleApiDebugOptions extends PipelineOptions {
+ /**
+ * This option enables tracing of API calls to Google services used within the Apache
+ * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...}
+ * </code> where the {@code ApiName} represents the request classes canonical name. The
+ * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported.
+ * Typically, "producer" is the right destination to use: this makes API traces available to the
+ * team offering the API. Note that by enabling this option, the contents of the requests to and
+ * from Google Cloud services will be made available to Google. For example, by specifying
+ * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available
+ * to Google, specifically to the Google Cloud Dataflow team.
+ */
+ @Description("This option enables tracing of API calls to Google services used within the Apache "
+ + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} "
+ + "where the ApiName represents the request classes canonical name. The TraceDestination is "
+ + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is "
+ + "the right destination to use: this makes API traces available to the team offering the "
+ + "API. Note that by enabling this option, the contents of the requests to and from "
+ + "Google Cloud services will be made available to Google. For example, by specifying "
+ + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to "
+ + "Google, specifically to the Google Cloud Dataflow team.")
+ GoogleApiTracer getGoogleApiTrace();
+ void setGoogleApiTrace(GoogleApiTracer commands);
+
+ /**
+ * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls.
+ */
+ class GoogleApiTracer extends HashMap<String, String>
+ implements GoogleClientRequestInitializer {
+ /**
+ * Creates a {@link GoogleApiTracer} that sets the trace destination on all
+ * calls that match the given client type.
+ */
+ public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) {
+ put(client.getClass().getCanonicalName(), traceDestination);
+ return this;
+ }
+
+ /**
+ * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all
+ * calls that match for the given request type.
+ */
+ public GoogleApiTracer addTraceFor(
+ AbstractGoogleClientRequest<?> request, String traceDestination) {
+ put(request.getClass().getCanonicalName(), traceDestination);
+ return this;
+ }
+
+ @Override
+ public void initialize(AbstractGoogleClientRequest<?> request) throws IOException {
+ for (Map.Entry<String, String> entry : this.entrySet()) {
+ if (request.getClass().getCanonicalName().contains(entry.getKey())) {
+ request.set("$trace", entry.getValue());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java
new file mode 100644
index 0000000..bc9646c
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines {@link org.apache.beam.sdk.options.PipelineOptions} for
+ * configuring pipeline execution for Google Cloud Platform components.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
deleted file mode 100644
index 13fdaf3..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.beam.sdk.util.GcpProjectUtil;
-
-/**
- * Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google CloudResourceManager. See "
- + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
-public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
- /**
- * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
- */
- @JsonIgnore
- @Description("The GcpProjectUtil instance that should be used to communicate"
- + " with Google Cloud Resource Manager.")
- @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
- @Hidden
- GcpProjectUtil getGcpProjectUtil();
- void setGcpProjectUtil(GcpProjectUtil value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
deleted file mode 100644
index d01406f..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.auth.Credentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
-import java.util.Locale;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.CredentialFactory;
-import org.apache.beam.sdk.util.DefaultBucket;
-import org.apache.beam.sdk.util.GcpCredentialFactory;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Options used to configure Google Cloud Platform specific options such as the project
- * and credentials.
- *
- * <p>These options defer to the
- * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
- * application default credentials</a> for authentication. See the
- * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for
- * alternative mechanisms for creating credentials.
- */
-@Description("Options used to configure Google Cloud Platform project and credentials.")
-public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
- /**
- * Project id to use when launching jobs.
- */
- @Description("Project id. Required when using Google Cloud Platform services. "
- + "See https://cloud.google.com/storage/docs/projects for further details.")
- @Default.InstanceFactory(DefaultProjectFactory.class)
- String getProject();
- void setProject(String value);
-
- /**
- * GCP <a href="https://developers.google.com/compute/docs/zones"
- * >availability zone</a> for operations.
- *
- * <p>Default is set on a per-service basis.
- */
- @Description("GCP availability zone for running GCP operations. "
- + "Default is up to the individual service.")
- String getZone();
- void setZone(String value);
-
- /**
- * The class of the credential factory that should be created and used to create
- * credentials. If gcpCredential has not been set explicitly, an instance of this class will
- * be constructed and used as a credential factory.
- */
- @Description("The class of the credential factory that should be created and used to create "
- + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
- + "be constructed and used as a credential factory.")
- @Default.Class(GcpCredentialFactory.class)
- Class<? extends CredentialFactory> getCredentialFactoryClass();
- void setCredentialFactoryClass(
- Class<? extends CredentialFactory> credentialFactoryClass);
-
- /**
- * The credential instance that should be used to authenticate against GCP services.
- * If no credential has been set explicitly, the default is to use the instance factory
- * that constructs a credential based upon the currently set credentialFactoryClass.
- */
- @JsonIgnore
- @Description("The credential instance that should be used to authenticate against GCP services. "
- + "If no credential has been set explicitly, the default is to use the instance factory "
- + "that constructs a credential based upon the currently set credentialFactoryClass.")
- @Default.InstanceFactory(GcpUserCredentialsFactory.class)
- Credentials getGcpCredential();
- void setGcpCredential(Credentials value);
-
- /**
- * Attempts to infer the default project based upon the environment this application
- * is executing within. Currently this only supports getting the default project from gcloud.
- */
- class DefaultProjectFactory implements DefaultValueFactory<String> {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
-
- @Override
- public String create(PipelineOptions options) {
- try {
- File configFile;
- if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
- configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
- } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
- configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
- } else {
- // New versions of gcloud use this file
- configFile = new File(
- System.getProperty("user.home"),
- ".config/gcloud/configurations/config_default");
- if (!configFile.exists()) {
- // Old versions of gcloud use this file
- configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
- }
- }
- String section = null;
- Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
- Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
- for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
- line = line.trim();
- if (line.isEmpty() || line.startsWith(";")) {
- continue;
- }
- Matcher matcher = sectionPattern.matcher(line);
- if (matcher.matches()) {
- section = matcher.group(1);
- } else if (section == null || section.equals("core")) {
- matcher = projectPattern.matcher(line);
- if (matcher.matches()) {
- String project = matcher.group(1).trim();
- LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
- + "project, please cancel this Pipeline and specify the command-line "
- + "argument --project.", project);
- return project;
- }
- }
- }
- } catch (IOException expected) {
- LOG.debug("Failed to find default project.", expected);
- }
- // return null if can't determine
- return null;
- }
-
- /**
- * Returns true if running on the Windows OS.
- */
- private static boolean isWindows() {
- return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
- }
-
- /**
- * Used to mock out getting environment variables.
- */
- @VisibleForTesting
- Map<String, String> getEnvironment() {
- return System.getenv();
- }
- }
-
- /**
- * Attempts to load the GCP credentials. See
- * {@link CredentialFactory#getCredential()} for more details.
- */
- class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> {
- @Override
- public Credentials create(PipelineOptions options) {
- GcpOptions gcpOptions = options.as(GcpOptions.class);
- try {
- CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
- .fromClass(gcpOptions.getCredentialFactoryClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- return factory.getCredential();
- } catch (IOException | GeneralSecurityException e) {
- throw new RuntimeException("Unable to obtain credential", e);
- }
- }
- }
-
- /**
- * A GCS path for storing temporary files in GCP.
- *
- * <p>Its default to {@link PipelineOptions#getTempLocation}.
- */
- @Description("A GCS path for storing temporary files in GCP.")
- @Default.InstanceFactory(GcpTempLocationFactory.class)
- @Nullable String getGcpTempLocation();
- void setGcpTempLocation(String value);
-
- /**
- * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
- */
- class GcpTempLocationFactory implements DefaultValueFactory<String> {
-
- @Override
- @Nullable
- public String create(PipelineOptions options) {
- String tempLocation = options.getTempLocation();
- if (isNullOrEmpty(tempLocation)) {
- tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
- options.setTempLocation(tempLocation);
- } else {
- try {
- PathValidator validator = options.as(GcsOptions.class).getPathValidator();
- validator.validateOutputFilePrefixSupported(tempLocation);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format(
- "Error constructing default value for gcpTempLocation: tempLocation is not"
- + " a valid GCS path, %s. ", tempLocation), e);
- }
- }
- return tempLocation;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
deleted file mode 100644
index 411121c..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.options;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-/**
- * A registrar containing the default GCP options.
- */
-@AutoService(PipelineOptionsRegistrar.class)
-public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>builder()
- .add(GcpOptions.class)
- .add(GcsOptions.class)
- .add(GoogleApiDebugOptions.class)
- .build();
- }
-}