You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/18 23:19:32 UTC

[6/7] beam git commit: [BEAM-1871] Create new GCP core module package and move several GCP related classes from beam-sdks-java-core over.

[BEAM-1871] Create new GCP core module package and move several GCP related classes from beam-sdks-java-core over.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be92f595
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be92f595
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be92f595

Branch: refs/heads/master
Commit: be92f5952706c2cbb980df5073fbc74925ed68e6
Parents: ed52d32
Author: Luke Cwik <lc...@google.com>
Authored: Mon Apr 17 18:02:02 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Apr 18 16:18:53 2017 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |   5 +
 examples/java8/pom.xml                          |   5 +
 pom.xml                                         |   6 +
 runners/flink/examples/pom.xml                  |   4 +
 runners/google-cloud-dataflow-java/pom.xml      |   5 +
 sdks/java/core/pom.xml                          |  46 +-
 .../beam/sdk/options/BigQueryOptions.java       |  32 -
 .../options/CloudResourceManagerOptions.java    |  40 -
 .../DefaultPipelineOptionsRegistrar.java        |   5 -
 .../org/apache/beam/sdk/options/GcpOptions.java | 227 ------
 .../org/apache/beam/sdk/options/GcsOptions.java | 158 ----
 .../beam/sdk/options/GoogleApiDebugOptions.java |  87 --
 .../beam/sdk/options/PipelineOptions.java       |   4 +-
 .../apache/beam/sdk/options/PubsubOptions.java  |  36 -
 .../apache/beam/sdk/runners/PipelineRunner.java |   2 +
 .../beam/sdk/testing/BigqueryMatcher.java       | 256 ------
 .../apache/beam/sdk/testing/TestPipeline.java   |   3 -
 .../beam/sdk/util/AppEngineEnvironment.java     |  62 --
 .../apache/beam/sdk/util/CredentialFactory.java |  29 -
 .../org/apache/beam/sdk/util/DefaultBucket.java | 105 ---
 .../beam/sdk/util/GcpCredentialFactory.java     |  67 --
 .../apache/beam/sdk/util/GcpProjectUtil.java    | 106 ---
 .../beam/sdk/util/GcsIOChannelFactory.java      | 111 ---
 .../sdk/util/GcsIOChannelFactoryRegistrar.java  |  38 -
 .../apache/beam/sdk/util/GcsPathValidator.java  |  95 ---
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 798 -------------------
 .../util/IntervalBoundedExponentialBackOff.java |  89 ---
 .../beam/sdk/util/NoopCredentialFactory.java    |  68 --
 .../sdk/util/NullCredentialInitializer.java     |  62 --
 .../org/apache/beam/sdk/util/Transport.java     | 178 -----
 .../org/apache/beam/SdkCoreApiSurfaceTest.java  |   2 -
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  97 +--
 .../apache/beam/sdk/options/GcpOptionsTest.java | 171 ----
 .../sdk/options/GoogleApiDebugOptionsTest.java  | 145 ----
 .../sdk/options/PipelineOptionsFactoryTest.java |   4 +-
 .../beam/sdk/runners/PipelineRunnerTest.java    |  46 +-
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 176 ----
 .../beam/sdk/testing/TestPipelineTest.java      |   6 +-
 .../apache/beam/sdk/util/DefaultBucketTest.java | 112 ---
 .../beam/sdk/util/GcpProjectUtilTest.java       |  76 --
 .../util/GcsIOChannelFactoryRegistrarTest.java  |  44 -
 .../beam/sdk/util/GcsIOChannelFactoryTest.java  |  43 -
 .../beam/sdk/util/GcsPathValidatorTest.java     |  87 --
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 798 -------------------
 .../IntervalBoundedExponentialBackOffTest.java  | 100 ---
 .../util/RetryHttpRequestInitializerTest.java   | 290 -------
 sdks/java/extensions/gcp-core/pom.xml           | 217 +++++
 .../beam/sdk/options/BigQueryOptions.java       |  32 +
 .../options/CloudResourceManagerOptions.java    |  40 +
 .../org/apache/beam/sdk/options/GcpOptions.java | 227 ++++++
 .../options/GcpPipelineOptionsRegistrar.java    |  39 +
 .../org/apache/beam/sdk/options/GcsOptions.java | 158 ++++
 .../beam/sdk/options/GoogleApiDebugOptions.java |  87 ++
 .../apache/beam/sdk/options/PubsubOptions.java  |  36 +
 .../apache/beam/sdk/options/package-info.java   |  22 +
 .../beam/sdk/testing/BigqueryMatcher.java       | 256 ++++++
 .../apache/beam/sdk/testing/package-info.java   |  21 +
 .../beam/sdk/util/AppEngineEnvironment.java     |  62 ++
 .../apache/beam/sdk/util/CredentialFactory.java |  29 +
 .../org/apache/beam/sdk/util/DefaultBucket.java | 105 +++
 .../beam/sdk/util/GcpCredentialFactory.java     |  67 ++
 .../apache/beam/sdk/util/GcpProjectUtil.java    | 106 +++
 .../beam/sdk/util/GcsIOChannelFactory.java      | 111 +++
 .../sdk/util/GcsIOChannelFactoryRegistrar.java  |  38 +
 .../apache/beam/sdk/util/GcsPathValidator.java  |  95 +++
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 798 +++++++++++++++++++
 .../util/IntervalBoundedExponentialBackOff.java |  89 +++
 .../beam/sdk/util/NoopCredentialFactory.java    |  68 ++
 .../sdk/util/NullCredentialInitializer.java     |  62 ++
 .../org/apache/beam/sdk/util/Transport.java     | 178 +++++
 .../org/apache/beam/sdk/util/package-info.java  |  20 +
 .../org/apache/beam/GcpCoreApiSurfaceTest.java  |  62 ++
 .../apache/beam/sdk/options/GcpOptionsTest.java | 171 ++++
 .../sdk/options/GoogleApiDebugOptionsTest.java  | 145 ++++
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 176 ++++
 .../apache/beam/sdk/util/DefaultBucketTest.java | 112 +++
 .../beam/sdk/util/GcpProjectUtilTest.java       |  76 ++
 .../util/GcsIOChannelFactoryRegistrarTest.java  |  44 +
 .../beam/sdk/util/GcsIOChannelFactoryTest.java  |  43 +
 .../beam/sdk/util/GcsPathValidatorTest.java     |  87 ++
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 798 +++++++++++++++++++
 .../IntervalBoundedExponentialBackOffTest.java  | 100 +++
 .../util/RetryHttpRequestInitializerTest.java   | 290 +++++++
 sdks/java/extensions/pom.xml                    |   1 +
 sdks/java/harness/pom.xml                       |   5 +
 sdks/java/io/google-cloud-platform/pom.xml      |  20 +-
 86 files changed, 5130 insertions(+), 4889 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 021a819..ae3d63d 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -461,6 +461,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 912c341..cd69acb 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -208,6 +208,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 09f3985..306978d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -357,6 +357,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-extensions-sorter</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index fa642bd..aaf76d9 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -65,6 +65,10 @@
   </profiles>
 
   <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index ff63a31..68d433a 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -171,6 +171,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-common-runner-api</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 0ac40f4..4ba8e3b 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -145,38 +145,30 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.auth</groupId>
-      <artifactId>google-auth-library-oauth2-http</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.api-client</groupId>
-      <artifactId>google-api-client</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-bigquery</artifactId>
     </dependency>
 
     <dependency>
       <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-cloudresourcemanager</artifactId>
+      <artifactId>google-api-services-storage</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-pubsub</artifactId>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-storage</artifactId>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>gcsio</artifactId>
+      <scope>runtime</scope>
     </dependency>
 
     <dependency>
-      <groupId>com.google.http-client</groupId>
-      <artifactId>google-http-client</artifactId>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util</artifactId>
+      <scope>runtime</scope>
     </dependency>
 
     <!-- Required by com.google.apis:google-api-services-datastore-protobuf, but 
@@ -189,31 +181,11 @@
 
     <dependency>
       <groupId>com.google.http-client</groupId>
-      <artifactId>google-http-client-jackson2</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.http-client</groupId>
       <artifactId>google-http-client-protobuf</artifactId>
       <scope>runtime</scope>
     </dependency>
 
     <dependency>
-      <groupId>com.google.oauth-client</groupId>
-      <artifactId>google-oauth-client</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.cloud.bigdataoss</groupId>
-      <artifactId>gcsio</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.cloud.bigdataoss</groupId>
-      <artifactId>util</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
deleted file mode 100644
index 7672cd7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties needed when using Google BigQuery with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google BigQuery. See "
-    + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-  @Description("Temporary dataset for BigQuery table operations. "
-      + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
-  @Default.String("bigquery.googleapis.com/cloud_dataflow")
-  String getTempDatasetId();
-  void setTempDatasetId(String value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
deleted file mode 100644
index 13fdaf3..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.beam.sdk.util.GcpProjectUtil;
-
-/**
- * Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google CloudResourceManager. See "
-    + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
-public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-  /**
-   * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
-   */
-  @JsonIgnore
-  @Description("The GcpProjectUtil instance that should be used to communicate"
-               + " with Google Cloud Resource Manager.")
-  @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
-  @Hidden
-  GcpProjectUtil getGcpProjectUtil();
-  void setGcpProjectUtil(GcpProjectUtil value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
index 069c109..b0ce812 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
@@ -32,11 +32,6 @@ public class DefaultPipelineOptionsRegistrar implements PipelineOptionsRegistrar
         .add(PipelineOptions.class)
         .add(ApplicationNameOptions.class)
         .add(StreamingOptions.class)
-        .add(BigQueryOptions.class)
-        .add(GcpOptions.class)
-        .add(GcsOptions.class)
-        .add(GoogleApiDebugOptions.class)
-        .add(PubsubOptions.class)
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
deleted file mode 100644
index d01406f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.auth.Credentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
-import java.util.Locale;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.CredentialFactory;
-import org.apache.beam.sdk.util.DefaultBucket;
-import org.apache.beam.sdk.util.GcpCredentialFactory;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Options used to configure Google Cloud Platform specific options such as the project
- * and credentials.
- *
- * <p>These options defer to the
- * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
- * application default credentials</a> for authentication. See the
- * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for
- * alternative mechanisms for creating credentials.
- */
-@Description("Options used to configure Google Cloud Platform project and credentials.")
-public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
-  /**
-   * Project id to use when launching jobs.
-   */
-  @Description("Project id. Required when using Google Cloud Platform services. "
-      + "See https://cloud.google.com/storage/docs/projects for further details.")
-  @Default.InstanceFactory(DefaultProjectFactory.class)
-  String getProject();
-  void setProject(String value);
-
-  /**
-   * GCP <a href="https://developers.google.com/compute/docs/zones"
-   * >availability zone</a> for operations.
-   *
-   * <p>Default is set on a per-service basis.
-   */
-  @Description("GCP availability zone for running GCP operations. "
-      + "Default is up to the individual service.")
-  String getZone();
-  void setZone(String value);
-
-  /**
-   * The class of the credential factory that should be created and used to create
-   * credentials. If gcpCredential has not been set explicitly, an instance of this class will
-   * be constructed and used as a credential factory.
-   */
-  @Description("The class of the credential factory that should be created and used to create "
-      + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
-      + "be constructed and used as a credential factory.")
-  @Default.Class(GcpCredentialFactory.class)
-  Class<? extends CredentialFactory> getCredentialFactoryClass();
-  void setCredentialFactoryClass(
-      Class<? extends CredentialFactory> credentialFactoryClass);
-
-  /**
-   * The credential instance that should be used to authenticate against GCP services.
-   * If no credential has been set explicitly, the default is to use the instance factory
-   * that constructs a credential based upon the currently set credentialFactoryClass.
-   */
-  @JsonIgnore
-  @Description("The credential instance that should be used to authenticate against GCP services. "
-      + "If no credential has been set explicitly, the default is to use the instance factory "
-      + "that constructs a credential based upon the currently set credentialFactoryClass.")
-  @Default.InstanceFactory(GcpUserCredentialsFactory.class)
-  Credentials getGcpCredential();
-  void setGcpCredential(Credentials value);
-
-  /**
-   * Attempts to infer the default project based upon the environment this application
-   * is executing within. Currently this only supports getting the default project from gcloud.
-   */
-  class DefaultProjectFactory implements DefaultValueFactory<String> {
-    private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
-
-    @Override
-    public String create(PipelineOptions options) {
-      try {
-        File configFile;
-        if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
-          configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
-        } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
-          configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
-        } else {
-          // New versions of gcloud use this file
-          configFile = new File(
-              System.getProperty("user.home"),
-              ".config/gcloud/configurations/config_default");
-          if (!configFile.exists()) {
-            // Old versions of gcloud use this file
-            configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
-          }
-        }
-        String section = null;
-        Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
-        Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
-        for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
-          line = line.trim();
-          if (line.isEmpty() || line.startsWith(";")) {
-            continue;
-          }
-          Matcher matcher = sectionPattern.matcher(line);
-          if (matcher.matches()) {
-            section = matcher.group(1);
-          } else if (section == null || section.equals("core")) {
-            matcher = projectPattern.matcher(line);
-            if (matcher.matches()) {
-              String project = matcher.group(1).trim();
-              LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
-                  + "project, please cancel this Pipeline and specify the command-line "
-                  + "argument --project.", project);
-              return project;
-            }
-          }
-        }
-      } catch (IOException expected) {
-        LOG.debug("Failed to find default project.", expected);
-      }
-      // return null if can't determine
-      return null;
-    }
-
-    /**
-     * Returns true if running on the Windows OS.
-     */
-    private static boolean isWindows() {
-      return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
-    }
-
-    /**
-     * Used to mock out getting environment variables.
-     */
-    @VisibleForTesting
-    Map<String, String> getEnvironment() {
-        return System.getenv();
-    }
-  }
-
-  /**
-   * Attempts to load the GCP credentials. See
-   * {@link CredentialFactory#getCredential()} for more details.
-   */
-  class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> {
-    @Override
-    public Credentials create(PipelineOptions options) {
-      GcpOptions gcpOptions = options.as(GcpOptions.class);
-      try {
-        CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
-            .fromClass(gcpOptions.getCredentialFactoryClass())
-            .fromFactoryMethod("fromOptions")
-            .withArg(PipelineOptions.class, options)
-            .build();
-        return factory.getCredential();
-      } catch (IOException | GeneralSecurityException e) {
-        throw new RuntimeException("Unable to obtain credential", e);
-      }
-    }
-  }
-
-  /**
-   * A GCS path for storing temporary files in GCP.
-   *
-   * <p>Its default to {@link PipelineOptions#getTempLocation}.
-   */
-  @Description("A GCS path for storing temporary files in GCP.")
-  @Default.InstanceFactory(GcpTempLocationFactory.class)
-  @Nullable String getGcpTempLocation();
-  void setGcpTempLocation(String value);
-
-  /**
-   * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
-   */
-  class GcpTempLocationFactory implements DefaultValueFactory<String> {
-
-    @Override
-    @Nullable
-    public String create(PipelineOptions options) {
-      String tempLocation = options.getTempLocation();
-      if (isNullOrEmpty(tempLocation)) {
-        tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
-        options.setTempLocation(tempLocation);
-      } else {
-        try {
-          PathValidator validator = options.as(GcsOptions.class).getPathValidator();
-          validator.validateOutputFilePrefixSupported(tempLocation);
-        } catch (Exception e) {
-          throw new IllegalArgumentException(String.format(
-              "Error constructing default value for gcpTempLocation: tempLocation is not"
-              + " a valid GCS path, %s. ", tempLocation), e);
-        }
-      }
-      return tempLocation;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
deleted file mode 100644
index 2187e7d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.AppEngineEnvironment;
-import org.apache.beam.sdk.util.GcsPathValidator;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
-
-/**
- * Options used to configure Google Cloud Storage.
- */
-public interface GcsOptions extends
-    ApplicationNameOptions, GcpOptions, PipelineOptions {
-  /**
-   * The GcsUtil instance that should be used to communicate with Google Cloud Storage.
-   */
-  @JsonIgnore
-  @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.")
-  @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class)
-  @Hidden
-  GcsUtil getGcsUtil();
-  void setGcsUtil(GcsUtil value);
-
-  /**
-   * The ExecutorService instance to use to create threads, can be overridden to specify an
-   * ExecutorService that is compatible with the users environment. If unset, the
-   * default is to create an ExecutorService with an unbounded number of threads; this
-   * is compatible with Google AppEngine.
-   */
-  @JsonIgnore
-  @Description("The ExecutorService instance to use to create multiple threads. Can be overridden "
-      + "to specify an ExecutorService that is compatible with the users environment. If unset, "
-      + "the default is to create an ExecutorService with an unbounded number of threads; this "
-      + "is compatible with Google AppEngine.")
-  @Default.InstanceFactory(ExecutorServiceFactory.class)
-  @Hidden
-  ExecutorService getExecutorService();
-  void setExecutorService(ExecutorService value);
-
-  /**
-   * GCS endpoint to use. If unspecified, uses the default endpoint.
-   */
-  @JsonIgnore
-  @Hidden
-  @Description("The URL for the GCS API.")
-  String getGcsEndpoint();
-  void setGcsEndpoint(String value);
-
-  /**
-   * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
-   * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
-   * restrictions and performance implications of this value.
-   */
-  @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the "
-      + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
-      + "information on the restrictions and performance implications of this value.\n\n"
-      + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
-      + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
-  @Nullable
-  Integer getGcsUploadBufferSizeBytes();
-  void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
-
-  /**
-   * The class of the validator that should be created and used to validate paths.
-   * If pathValidator has not been set explicitly, an instance of this class will be
-   * constructed and used as the path validator.
-   */
-  @Description("The class of the validator that should be created and used to validate paths. "
-      + "If pathValidator has not been set explicitly, an instance of this class will be "
-      + "constructed and used as the path validator.")
-  @Default.Class(GcsPathValidator.class)
-  Class<? extends PathValidator> getPathValidatorClass();
-  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
-  /**
-   * The path validator instance that should be used to validate paths.
-   * If no path validator has been set explicitly, the default is to use the instance factory that
-   * constructs a path validator based upon the currently set pathValidatorClass.
-   */
-  @JsonIgnore
-  @Description("The path validator instance that should be used to validate paths. "
-      + "If no path validator has been set explicitly, the default is to use the instance factory "
-      + "that constructs a path validator based upon the currently set pathValidatorClass.")
-  @Default.InstanceFactory(PathValidatorFactory.class)
-  PathValidator getPathValidator();
-  void setPathValidator(PathValidator validator);
-
-  /**
-   * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The
-   * {@link ExecutorService} is compatible with AppEngine.
-   */
-  class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
-    @SuppressWarnings("deprecation")  // IS_APP_ENGINE is deprecated for internal use only.
-    @Override
-    public ExecutorService create(PipelineOptions options) {
-      ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
-      threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
-      if (!AppEngineEnvironment.IS_APP_ENGINE) {
-        // AppEngine doesn't allow modification of threads to be daemon threads.
-        threadFactoryBuilder.setDaemon(true);
-      }
-      /* The SDK requires an unbounded thread pool because a step may create X writers
-       * each requiring their own thread to perform the writes otherwise a writer may
-       * block causing deadlock for the step because the writers buffer is full.
-       * Also, the MapTaskExecutor launches the steps in reverse order and completes
-       * them in forward order thus requiring enough threads so that each step's writers
-       * can be active.
-       */
-      return new ThreadPoolExecutor(
-          0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-          Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-          new SynchronousQueue<Runnable>(),
-          threadFactoryBuilder.build());
-    }
-  }
-
-  /**
-   * Creates a {@link PathValidator} object using the class specified in
-   * {@link #getPathValidatorClass()}.
-   */
-  class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
-    @Override
-    public PathValidator create(PipelineOptions options) {
-      GcsOptions gcsOptions = options.as(GcsOptions.class);
-      return InstanceBuilder.ofType(PathValidator.class)
-          .fromClass(gcsOptions.getPathValidatorClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
deleted file mode 100644
index f9cb575..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.google.api.client.googleapis.services.AbstractGoogleClient;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.googleapis.services.GoogleClientRequestInitializer;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * These options configure debug settings for Google API clients created within the Apache Beam SDK.
- */
-public interface GoogleApiDebugOptions extends PipelineOptions {
-  /**
-   * This option enables tracing of API calls to Google services used within the Apache
-   * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...}
-   * </code> where the {@code ApiName} represents the request classes canonical name. The
-   * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported.
-   * Typically, "producer" is the right destination to use: this makes API traces available to the
-   * team offering the API. Note that by enabling this option, the contents of the requests to and
-   * from Google Cloud services will be made available to Google. For example, by specifying
-   * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available
-   * to Google, specifically to the Google Cloud Dataflow team.
-   */
-  @Description("This option enables tracing of API calls to Google services used within the Apache "
-      + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} "
-      + "where the ApiName represents the request classes canonical name. The TraceDestination is "
-      + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is "
-      + "the right destination to use: this makes API traces available to the team offering the "
-      + "API. Note that by enabling this option, the contents of the requests to and from "
-      + "Google Cloud services will be made available to Google. For example, by specifying "
-      + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to "
-      + "Google, specifically to the Google Cloud Dataflow team.")
-  GoogleApiTracer getGoogleApiTrace();
-  void setGoogleApiTrace(GoogleApiTracer commands);
-
-  /**
-   * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls.
-   */
-  class GoogleApiTracer extends HashMap<String, String>
-      implements GoogleClientRequestInitializer {
-    /**
-     * Creates a {@link GoogleApiTracer} that sets the trace destination on all
-     * calls that match the given client type.
-     */
-    public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) {
-      put(client.getClass().getCanonicalName(), traceDestination);
-      return this;
-    }
-
-    /**
-     * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all
-     * calls that match for the given request type.
-     */
-    public GoogleApiTracer addTraceFor(
-        AbstractGoogleClientRequest<?> request, String traceDestination) {
-      put(request.getClass().getCanonicalName(), traceDestination);
-      return this;
-    }
-
-    @Override
-    public void initialize(AbstractGoogleClientRequest<?> request) throws IOException {
-      for (Map.Entry<String, String> entry : this.entrySet()) {
-        if (request.getClass().getCanonicalName().contains(entry.getKey())) {
-          request.set("$trace", entry.getValue());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 4e7bc89..88d6576 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -187,8 +186,7 @@ import org.joda.time.format.DateTimeFormatter;
  * <a href="https://github.com/FasterXML/jackson-annotations">annotations</a> to aid in
  * serialization of custom types. We point you to the public
  * <a href="https://github.com/FasterXML/jackson">Jackson documentation</a> when attempting
- * to add serialization support for your custom types. See {@link GoogleApiTracer} for an
- * example using the Jackson annotations to serialize and deserialize a custom type.
+ * to add serialization support for your custom types.
  *
  * <p>Note: It is an error to have the same property available in multiple interfaces with only
  * some of them being annotated with {@link JsonIgnore @JsonIgnore}. It is also an error to mark a

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
deleted file mode 100644
index b065d19..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google Cloud Pub/Sub. See "
-    + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.")
-public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-
-  /**
-   * Root URL for use with the Google Cloud Pub/Sub API.
-   */
-  @Description("Root URL for use with the Google Cloud Pub/Sub API")
-  @Default.String("https://pubsub.googleapis.com")
-  @Hidden
-  String getPubsubRootUrl();
-  void setPubsubRootUrl(String value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 7b2fba3..a318dfc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 
@@ -41,6 +42,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
    */
   public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) {
     checkNotNull(options);
+    PipelineOptionsValidator.validate(PipelineOptions.class, options);
 
     // (Re-)register standard IO factories. Clobbers any prior credentials.
     IOChannelUtils.registerIOFactoriesAllowOverride(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
deleted file mode 100644
index 8f752c0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.BigqueryScopes;
-import com.google.api.services.bigquery.model.QueryRequest;
-import com.google.api.services.bigquery.model.QueryResponse;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Transport;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A matcher to verify data in BigQuery by processing given query
- * and comparing with content's checksum.
- *
- * <p>Example:
- * <pre>{@code [
- *   assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum));
- * ]}</pre>
- */
-@NotThreadSafe
-public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
-    implements SerializableMatcher<PipelineResult> {
-  private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class);
-
-  // The maximum number of retries to execute a BigQuery RPC
-  static final int MAX_QUERY_RETRIES = 4;
-
-  // The initial backoff for executing a BigQuery RPC
-  private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L);
-
-  // The total number of rows in query response to be formatted for debugging purpose
-  private static final int TOTAL_FORMATTED_ROWS = 20;
-
-  // The backoff factory with initial configs
-  static final FluentBackoff BACKOFF_FACTORY =
-      FluentBackoff.DEFAULT
-          .withMaxRetries(MAX_QUERY_RETRIES)
-          .withInitialBackoff(INITIAL_BACKOFF);
-
-  private final String applicationName;
-  private final String projectId;
-  private final String query;
-  private final String expectedChecksum;
-  private String actualChecksum;
-  private transient QueryResponse response;
-
-  public BigqueryMatcher(
-      String applicationName, String projectId, String query, String expectedChecksum) {
-    validateArgument("applicationName", applicationName);
-    validateArgument("projectId", projectId);
-    validateArgument("query", query);
-    validateArgument("expectedChecksum", expectedChecksum);
-
-    this.applicationName = applicationName;
-    this.projectId = projectId;
-    this.query = query;
-    this.expectedChecksum = expectedChecksum;
-  }
-
-  @Override
-  protected boolean matchesSafely(PipelineResult pipelineResult) {
-    LOG.info("Verifying Bigquery data");
-    Bigquery bigqueryClient = newBigqueryClient(applicationName);
-
-    // execute query
-    LOG.debug("Executing query: {}", query);
-    try {
-      QueryRequest queryContent = new QueryRequest();
-      queryContent.setQuery(query);
-
-      response = queryWithRetries(
-          bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
-    } catch (IOException | InterruptedException e) {
-      if (e instanceof InterruptedIOException) {
-        Thread.currentThread().interrupt();
-      }
-      throw new RuntimeException("Failed to fetch BigQuery data.", e);
-    }
-
-    if (!response.getJobComplete()) {
-      // query job not complete, verification failed
-      return false;
-    } else {
-      // compute checksum
-      actualChecksum = generateHash(response.getRows());
-      LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
-
-      return expectedChecksum.equals(actualChecksum);
-    }
-  }
-
-  @VisibleForTesting
-  Bigquery newBigqueryClient(String applicationName) {
-    HttpTransport transport = Transport.getTransport();
-    JsonFactory jsonFactory = Transport.getJsonFactory();
-    Credentials credential = getDefaultCredential();
-
-    return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential))
-        .setApplicationName(applicationName)
-        .build();
-  }
-
-  @Nonnull
-  @VisibleForTesting
-  QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
-                                 Sleeper sleeper, BackOff backOff)
-      throws IOException, InterruptedException {
-    IOException lastException = null;
-    do {
-      if (lastException != null) {
-        LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
-      }
-      try {
-        QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
-        if (response != null) {
-          return response;
-        } else {
-          lastException =
-              new IOException("Expected valid response from query job, but received null.");
-        }
-      } catch (IOException e) {
-        // ignore and retry
-        lastException = e;
-      }
-    } while(BackOffUtils.next(sleeper, backOff));
-
-    throw new RuntimeException(
-        String.format(
-            "Unable to get BigQuery response after retrying %d times using query (%s)",
-            MAX_QUERY_RETRIES,
-            queryContent.getQuery()),
-        lastException);
-  }
-
-  private void validateArgument(String name, String value) {
-    checkArgument(
-        !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value);
-  }
-
-  private Credentials getDefaultCredential() {
-    GoogleCredentials credential;
-    try {
-      credential = GoogleCredentials.getApplicationDefault();
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to get application default credential.", e);
-    }
-
-    if (credential.createScopedRequired()) {
-      Collection<String> bigqueryScope =
-          Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY);
-      credential = credential.createScoped(bigqueryScope);
-    }
-    return credential;
-  }
-
-  private String generateHash(@Nonnull List<TableRow> rows) {
-    List<HashCode> rowHashes = Lists.newArrayList();
-    for (TableRow row : rows) {
-      List<String> cellsInOneRow = Lists.newArrayList();
-      for (TableCell cell : row.getF()) {
-        cellsInOneRow.add(Objects.toString(cell.getV()));
-        Collections.sort(cellsInOneRow);
-      }
-      rowHashes.add(
-          Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8));
-    }
-    return Hashing.combineUnordered(rowHashes).toString();
-  }
-
-  @Override
-  public void describeTo(Description description) {
-    description
-        .appendText("Expected checksum is (")
-        .appendText(expectedChecksum)
-        .appendText(")");
-  }
-
-  @Override
-  public void describeMismatchSafely(PipelineResult pResult, Description description) {
-    String info;
-    if (!response.getJobComplete()) {
-      // query job not complete
-      info = String.format("The query job hasn't completed. Got response: %s", response);
-    } else {
-      // checksum mismatch
-      info = String.format("was (%s).%n"
-          + "\tTotal number of rows are: %d.%n"
-          + "\tQueried data details:%s",
-          actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS));
-    }
-    description.appendText(info);
-  }
-
-  private String formatRows(int totalNumRows) {
-    StringBuilder samples = new StringBuilder();
-    List<TableRow> rows = response.getRows();
-    for (int i = 0; i < totalNumRows && i < rows.size(); i++) {
-      samples.append(String.format("%n\t\t"));
-      for (TableCell field : rows.get(i).getF()) {
-        samples.append(String.format("%-10s", field.getV()));
-      }
-    }
-    if (rows.size() > totalNumRows) {
-      samples.append(String.format("%n\t\t..."));
-    }
-    return samples.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 3d3de51..1273442 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -42,14 +42,12 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.TestCredential;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
@@ -400,7 +398,6 @@ public class TestPipeline extends Pipeline implements TestRule {
         if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) {
           options.setRunner(CrashingRunner.class);
         }
-        options.as(GcpOptions.class).setGcpCredential(new TestCredential());
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
deleted file mode 100644
index b0fcbd1..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.lang.reflect.InvocationTargetException;
-
-/** Stores whether we are running within AppEngine or not. */
-public class AppEngineEnvironment {
-  /**
-   * True if running inside of AppEngine, false otherwise.
-   */
-  @Deprecated
-  public static final boolean IS_APP_ENGINE = isAppEngine();
-
-  /**
-   * Attempts to detect whether we are inside of AppEngine.
-   *
-   * <p>Purposely copied and left private from private <a href="https://code.google.com/p/
-   * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/
-   * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>.
-   *
-   * @return true if we are inside of AppEngine, false otherwise.
-   */
-  static boolean isAppEngine() {
-    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
-      return false;
-    }
-    try {
-      // If the current environment is null, we're not inside AppEngine.
-      return Class.forName("com.google.apphosting.api.ApiProxy")
-          .getMethod("getCurrentEnvironment")
-          .invoke(null) != null;
-    } catch (ClassNotFoundException e) {
-      // If ApiProxy doesn't exist, we're not on AppEngine at all.
-      return false;
-    } catch (InvocationTargetException e) {
-      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
-      return false;
-    } catch (IllegalAccessException e) {
-      // If the method isn't accessible, we're not on a supported version of AppEngine;
-      return false;
-    } catch (NoSuchMethodException e) {
-      // If the method doesn't exist, we're not on a supported version of AppEngine;
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
deleted file mode 100644
index 6229650..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.auth.Credentials;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- */
-public interface CredentialFactory {
-  Credentials getCredential() throws IOException, GeneralSecurityException;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
deleted file mode 100644
index 75954c0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.google.api.services.storage.model.Bucket;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class for handling default GCS buckets.
- */
-public class DefaultBucket {
-  static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
-
-  static final String DEFAULT_REGION = "us-central1";
-
-  /**
-   * Creates a default bucket or verifies the existence and proper access control
-   * of an existing default bucket.  Returns the location if successful.
-   */
-  public static String tryCreateDefaultBucket(PipelineOptions options) {
-    GcsOptions gcpOptions = options.as(GcsOptions.class);
-
-    final String projectId = gcpOptions.getProject();
-    checkArgument(!isNullOrEmpty(projectId),
-                  "--project is a required option.");
-
-    // Look up the project number, to create a default bucket with a stable
-    // name with no special characters.
-    long projectNumber = 0L;
-    try {
-      projectNumber = gcpOptions.as(CloudResourceManagerOptions.class)
-          .getGcpProjectUtil().getProjectNumber(projectId);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to verify project with ID " + projectId, e);
-    }
-    String region = DEFAULT_REGION;
-    if (!isNullOrEmpty(gcpOptions.getZone())) {
-      region = getRegionFromZone(gcpOptions.getZone());
-    }
-    final String bucketName =
-      "dataflow-staging-" + region + "-" + projectNumber;
-    LOG.info("No staging location provided, attempting to use default bucket: {}",
-             bucketName);
-    Bucket bucket = new Bucket()
-      .setName(bucketName)
-      .setLocation(region);
-    // Always try to create the bucket before checking access, so that we do not
-    // race with other pipelines that may be attempting to do the same thing.
-    try {
-      gcpOptions.getGcsUtil().createBucket(projectId, bucket);
-    } catch (FileAlreadyExistsException e) {
-      LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable create default bucket.", e);
-    }
-
-    // Once the bucket is expected to exist, verify that it is correctly owned
-    // by the project executing the job.
-    try {
-      long owner = gcpOptions.getGcsUtil().bucketOwner(
-        GcsPath.fromComponents(bucketName, ""));
-      checkArgument(
-        owner == projectNumber,
-        "Bucket owner does not match the project from --project:"
-        + " %s vs. %s", owner, projectNumber);
-    } catch (IOException e) {
-      throw new RuntimeException(
-        "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
-    }
-    return "gs://" + bucketName;
-  }
-
-  @VisibleForTesting
-  static String getRegionFromZone(String zone) {
-    String[] zoneParts = zone.split("-");
-    checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
-    return zoneParts[0] + "-" + zoneParts[1];
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
deleted file mode 100644
index e1fa18f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.auth.Credentials;
-import com.google.auth.oauth2.GoogleCredentials;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- * Returns a GCP credential.
- */
-public class GcpCredentialFactory implements CredentialFactory {
-  /**
-   * The scope cloud-platform provides access to all Cloud Platform resources.
-   * cloud-platform isn't sufficient yet for talking to datastore so we request
-   * those resources separately.
-   *
-   * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for
-   * services we access directly (GCS) as opposed to through the backend
-   * (BigQuery, GCE), we need to explicitly request that scope.
-   */
-  private static final List<String> SCOPES = Arrays.asList(
-      "https://www.googleapis.com/auth/cloud-platform",
-      "https://www.googleapis.com/auth/devstorage.full_control",
-      "https://www.googleapis.com/auth/userinfo.email",
-      "https://www.googleapis.com/auth/datastore",
-      "https://www.googleapis.com/auth/pubsub");
-
-  private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory();
-
-  public static GcpCredentialFactory fromOptions(PipelineOptions options) {
-    return INSTANCE;
-  }
-
-  /**
-   * Returns a default GCP {@link Credentials} or null when it fails.
-   */
-  @Override
-  public Credentials getCredential() {
-    try {
-      return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
-    } catch (IOException e) {
-      // Ignore the exception
-      // Pipelines that only access to public data should be able to run without credentials.
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
deleted file mode 100644
index f73afe0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.cloudresourcemanager.model.Project;
-import com.google.cloud.hadoop.util.ResilientOperation;
-import com.google.cloud.hadoop.util.RetryDeterminer;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides operations on Google Cloud Platform Projects.
- */
-public class GcpProjectUtil {
-  /**
-   * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using
-   * any transport flags specified on the {@link PipelineOptions}.
-   */
-  public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> {
-    /**
-     * Returns an instance of {@link GcpProjectUtil} based on the
-     * {@link PipelineOptions}.
-     */
-    @Override
-    public GcpProjectUtil create(PipelineOptions options) {
-      LOG.debug("Creating new GcpProjectUtil");
-      CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class);
-      return new GcpProjectUtil(
-          Transport.newCloudResourceManagerClient(crmOptions).build());
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class);
-
-  private static final FluentBackoff BACKOFF_FACTORY =
-      FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
-
-  /** Client for the CRM API. */
-  private CloudResourceManager crmClient;
-
-  private GcpProjectUtil(CloudResourceManager crmClient) {
-    this.crmClient = crmClient;
-  }
-
-  // Use this only for testing purposes.
-  @VisibleForTesting
-  void setCrmClient(CloudResourceManager crmClient) {
-    this.crmClient = crmClient;
-  }
-
-  /**
-   * Returns the project number or throws an exception if the project does not
-   * exist or has other access exceptions.
-   */
-  public long getProjectNumber(String projectId) throws IOException {
-    return getProjectNumber(
-      projectId,
-      BACKOFF_FACTORY.backoff(),
-      Sleeper.DEFAULT);
-  }
-
-  /**
-   * Returns the project number or throws an error if the project does not
-   * exist or has other access errors.
-   */
-  @VisibleForTesting
-  long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException {
-      CloudResourceManager.Projects.Get getProject =
-          crmClient.projects().get(projectId);
-      try {
-        Project project = ResilientOperation.retry(
-            ResilientOperation.getGoogleRequestCallable(getProject),
-            backoff,
-            RetryDeterminer.SOCKET_ERRORS,
-            IOException.class,
-            sleeper);
-        return project.getProjectNumber();
-      } catch (Exception e) {
-        throw new IOException("Unable to get project number", e);
-     }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
deleted file mode 100644
index 745dcb9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * Implements IOChannelFactory for GCS.
- */
-public class GcsIOChannelFactory implements IOChannelFactory {
-
-  /**
-   * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}.
-   */
-  public static GcsIOChannelFactory fromOptions(PipelineOptions options) {
-    return new GcsIOChannelFactory(options.as(GcsOptions.class));
-  }
-
-  private final GcsOptions options;
-
-  private GcsIOChannelFactory(GcsOptions options) {
-    this.options = options;
-  }
-
-  @Override
-  public Collection<String> match(String spec) throws IOException {
-    GcsPath path = GcsPath.fromUri(spec);
-    GcsUtil util = options.getGcsUtil();
-    List<GcsPath> matched = util.expand(path);
-
-    List<String> specs = new LinkedList<>();
-    for (GcsPath match : matched) {
-      specs.add(match.toString());
-    }
-
-    return specs;
-  }
-
-  @Override
-  public ReadableByteChannel open(String spec) throws IOException {
-    GcsPath path = GcsPath.fromUri(spec);
-    GcsUtil util = options.getGcsUtil();
-    return util.open(path);
-  }
-
-  @Override
-  public WritableByteChannel create(String spec, String mimeType)
-      throws IOException {
-    GcsPath path = GcsPath.fromUri(spec);
-    GcsUtil util = options.getGcsUtil();
-    return util.create(path, mimeType);
-  }
-
-  @Override
-  public long getSizeBytes(String spec) throws IOException {
-    GcsPath path = GcsPath.fromUri(spec);
-    GcsUtil util = options.getGcsUtil();
-    return util.fileSize(path);
-  }
-
-  @Override
-  public boolean isReadSeekEfficient(String spec) throws IOException {
-    // TODO It is incorrect to return true here for files with content encoding set to gzip.
-    return true;
-  }
-
-  @Override
-  public String resolve(String path, String other) throws IOException {
-    return toPath(path).resolve(other).toString();
-  }
-
-  @Override
-  public Path toPath(String path) {
-    return GcsPath.fromUri(path);
-  }
-
-  @Override
-  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
-      throws IOException {
-    options.getGcsUtil().copy(srcFilenames, destFilenames);
-  }
-
-  @Override
-  public void remove(Collection<String> filesOrDirs) throws IOException {
-    options.getGcsUtil().remove(filesOrDirs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
deleted file mode 100644
index b4c457f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * {@link AutoService} registrar for the {@link GcsIOChannelFactory}.
- */
-@AutoService(IOChannelFactoryRegistrar.class)
-public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar {
-
-  @Override
-  public GcsIOChannelFactory fromOptions(PipelineOptions options) {
-    return GcsIOChannelFactory.fromOptions(options);
-  }
-
-  @Override
-  public String getScheme() {
-    return "gs";
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
deleted file mode 100644
index a5b951d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class GcsPathValidator implements PathValidator {
-
-  private GcsOptions gcpOptions;
-
-  private GcsPathValidator(GcsOptions options) {
-    this.gcpOptions = options;
-  }
-
-  public static GcsPathValidator fromOptions(PipelineOptions options) {
-    return new GcsPathValidator(options.as(GcsOptions.class));
-  }
-
-  /**
-   * Validates the the input GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    GcsPath gcsPath = getGcsPath(filepattern);
-    checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
-    verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
-  }
-
-  /**
-   * Validates the the output GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
-    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    GcsPath gcsPath = getGcsPath(path);
-    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
-    checkArgument(!gcsPath.getObject().contains("//"),
-        "Dataflow Service does not allow objects with consecutive slashes");
-    return gcsPath.toResourceName();
-  }
-
-  private void verifyPathIsAccessible(String path, String errorMessage) {
-    GcsPath gcsPath = getGcsPath(path);
-    try {
-      checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
-        errorMessage, path);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
-    }
-  }
-
-  private GcsPath getGcsPath(String path) {
-    try {
-      return GcsPath.fromUri(path);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(String.format(
-          "Expected a valid 'gs://' path but was given '%s'", path), e);
-    }
-  }
-}