You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/18 23:19:32 UTC
[6/7] beam git commit: [BEAM-1871] Create new GCP core module package
and move several GCP related classes from beam-sdks-java-core over.
[BEAM-1871] Create new GCP core module package and move several GCP related classes from beam-sdks-java-core over.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be92f595
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be92f595
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be92f595
Branch: refs/heads/master
Commit: be92f5952706c2cbb980df5073fbc74925ed68e6
Parents: ed52d32
Author: Luke Cwik <lc...@google.com>
Authored: Mon Apr 17 18:02:02 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Apr 18 16:18:53 2017 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 5 +
examples/java8/pom.xml | 5 +
pom.xml | 6 +
runners/flink/examples/pom.xml | 4 +
runners/google-cloud-dataflow-java/pom.xml | 5 +
sdks/java/core/pom.xml | 46 +-
.../beam/sdk/options/BigQueryOptions.java | 32 -
.../options/CloudResourceManagerOptions.java | 40 -
.../DefaultPipelineOptionsRegistrar.java | 5 -
.../org/apache/beam/sdk/options/GcpOptions.java | 227 ------
.../org/apache/beam/sdk/options/GcsOptions.java | 158 ----
.../beam/sdk/options/GoogleApiDebugOptions.java | 87 --
.../beam/sdk/options/PipelineOptions.java | 4 +-
.../apache/beam/sdk/options/PubsubOptions.java | 36 -
.../apache/beam/sdk/runners/PipelineRunner.java | 2 +
.../beam/sdk/testing/BigqueryMatcher.java | 256 ------
.../apache/beam/sdk/testing/TestPipeline.java | 3 -
.../beam/sdk/util/AppEngineEnvironment.java | 62 --
.../apache/beam/sdk/util/CredentialFactory.java | 29 -
.../org/apache/beam/sdk/util/DefaultBucket.java | 105 ---
.../beam/sdk/util/GcpCredentialFactory.java | 67 --
.../apache/beam/sdk/util/GcpProjectUtil.java | 106 ---
.../beam/sdk/util/GcsIOChannelFactory.java | 111 ---
.../sdk/util/GcsIOChannelFactoryRegistrar.java | 38 -
.../apache/beam/sdk/util/GcsPathValidator.java | 95 ---
.../java/org/apache/beam/sdk/util/GcsUtil.java | 798 -------------------
.../util/IntervalBoundedExponentialBackOff.java | 89 ---
.../beam/sdk/util/NoopCredentialFactory.java | 68 --
.../sdk/util/NullCredentialInitializer.java | 62 --
.../org/apache/beam/sdk/util/Transport.java | 178 -----
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 2 -
.../java/org/apache/beam/sdk/io/TextIOTest.java | 97 +--
.../apache/beam/sdk/options/GcpOptionsTest.java | 171 ----
.../sdk/options/GoogleApiDebugOptionsTest.java | 145 ----
.../sdk/options/PipelineOptionsFactoryTest.java | 4 +-
.../beam/sdk/runners/PipelineRunnerTest.java | 46 +-
.../beam/sdk/testing/BigqueryMatcherTest.java | 176 ----
.../beam/sdk/testing/TestPipelineTest.java | 6 +-
.../apache/beam/sdk/util/DefaultBucketTest.java | 112 ---
.../beam/sdk/util/GcpProjectUtilTest.java | 76 --
.../util/GcsIOChannelFactoryRegistrarTest.java | 44 -
.../beam/sdk/util/GcsIOChannelFactoryTest.java | 43 -
.../beam/sdk/util/GcsPathValidatorTest.java | 87 --
.../org/apache/beam/sdk/util/GcsUtilTest.java | 798 -------------------
.../IntervalBoundedExponentialBackOffTest.java | 100 ---
.../util/RetryHttpRequestInitializerTest.java | 290 -------
sdks/java/extensions/gcp-core/pom.xml | 217 +++++
.../beam/sdk/options/BigQueryOptions.java | 32 +
.../options/CloudResourceManagerOptions.java | 40 +
.../org/apache/beam/sdk/options/GcpOptions.java | 227 ++++++
.../options/GcpPipelineOptionsRegistrar.java | 39 +
.../org/apache/beam/sdk/options/GcsOptions.java | 158 ++++
.../beam/sdk/options/GoogleApiDebugOptions.java | 87 ++
.../apache/beam/sdk/options/PubsubOptions.java | 36 +
.../apache/beam/sdk/options/package-info.java | 22 +
.../beam/sdk/testing/BigqueryMatcher.java | 256 ++++++
.../apache/beam/sdk/testing/package-info.java | 21 +
.../beam/sdk/util/AppEngineEnvironment.java | 62 ++
.../apache/beam/sdk/util/CredentialFactory.java | 29 +
.../org/apache/beam/sdk/util/DefaultBucket.java | 105 +++
.../beam/sdk/util/GcpCredentialFactory.java | 67 ++
.../apache/beam/sdk/util/GcpProjectUtil.java | 106 +++
.../beam/sdk/util/GcsIOChannelFactory.java | 111 +++
.../sdk/util/GcsIOChannelFactoryRegistrar.java | 38 +
.../apache/beam/sdk/util/GcsPathValidator.java | 95 +++
.../java/org/apache/beam/sdk/util/GcsUtil.java | 798 +++++++++++++++++++
.../util/IntervalBoundedExponentialBackOff.java | 89 +++
.../beam/sdk/util/NoopCredentialFactory.java | 68 ++
.../sdk/util/NullCredentialInitializer.java | 62 ++
.../org/apache/beam/sdk/util/Transport.java | 178 +++++
.../org/apache/beam/sdk/util/package-info.java | 20 +
.../org/apache/beam/GcpCoreApiSurfaceTest.java | 62 ++
.../apache/beam/sdk/options/GcpOptionsTest.java | 171 ++++
.../sdk/options/GoogleApiDebugOptionsTest.java | 145 ++++
.../beam/sdk/testing/BigqueryMatcherTest.java | 176 ++++
.../apache/beam/sdk/util/DefaultBucketTest.java | 112 +++
.../beam/sdk/util/GcpProjectUtilTest.java | 76 ++
.../util/GcsIOChannelFactoryRegistrarTest.java | 44 +
.../beam/sdk/util/GcsIOChannelFactoryTest.java | 43 +
.../beam/sdk/util/GcsPathValidatorTest.java | 87 ++
.../org/apache/beam/sdk/util/GcsUtilTest.java | 798 +++++++++++++++++++
.../IntervalBoundedExponentialBackOffTest.java | 100 +++
.../util/RetryHttpRequestInitializerTest.java | 290 +++++++
sdks/java/extensions/pom.xml | 1 +
sdks/java/harness/pom.xml | 5 +
sdks/java/io/google-cloud-platform/pom.xml | 20 +-
86 files changed, 5130 insertions(+), 4889 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 021a819..ae3d63d 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -461,6 +461,11 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 912c341..cd69acb 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -208,6 +208,11 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 09f3985..306978d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -357,6 +357,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-sorter</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index fa642bd..aaf76d9 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -65,6 +65,10 @@
</profiles>
<dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index ff63a31..68d433a 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -171,6 +171,11 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-common-runner-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 0ac40f4..4ba8e3b 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -145,38 +145,30 @@
</dependency>
<dependency>
- <groupId>com.google.auth</groupId>
- <artifactId>google-auth-library-oauth2-http</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.api-client</groupId>
- <artifactId>google-api-client</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
- <artifactId>google-api-services-cloudresourcemanager</artifactId>
+ <artifactId>google-api-services-storage</artifactId>
</dependency>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-pubsub</artifactId>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
</dependency>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-storage</artifactId>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>gcsio</artifactId>
+ <scope>runtime</scope>
</dependency>
<dependency>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client</artifactId>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>util</artifactId>
+ <scope>runtime</scope>
</dependency>
<!-- Required by com.google.apis:google-api-services-datastore-protobuf, but
@@ -189,31 +181,11 @@
<dependency>
<groupId>com.google.http-client</groupId>
- <artifactId>google-http-client-jackson2</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.http-client</groupId>
<artifactId>google-http-client-protobuf</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
- <groupId>com.google.oauth-client</groupId>
- <artifactId>google-oauth-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.cloud.bigdataoss</groupId>
- <artifactId>gcsio</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.cloud.bigdataoss</groupId>
- <artifactId>util</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
deleted file mode 100644
index 7672cd7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties needed when using Google BigQuery with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google BigQuery. See "
- + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
- @Description("Temporary dataset for BigQuery table operations. "
- + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
- @Default.String("bigquery.googleapis.com/cloud_dataflow")
- String getTempDatasetId();
- void setTempDatasetId(String value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
deleted file mode 100644
index 13fdaf3..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.beam.sdk.util.GcpProjectUtil;
-
-/**
- * Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google CloudResourceManager. See "
- + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
-public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
- /**
- * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
- */
- @JsonIgnore
- @Description("The GcpProjectUtil instance that should be used to communicate"
- + " with Google Cloud Resource Manager.")
- @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
- @Hidden
- GcpProjectUtil getGcpProjectUtil();
- void setGcpProjectUtil(GcpProjectUtil value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
index 069c109..b0ce812 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
@@ -32,11 +32,6 @@ public class DefaultPipelineOptionsRegistrar implements PipelineOptionsRegistrar
.add(PipelineOptions.class)
.add(ApplicationNameOptions.class)
.add(StreamingOptions.class)
- .add(BigQueryOptions.class)
- .add(GcpOptions.class)
- .add(GcsOptions.class)
- .add(GoogleApiDebugOptions.class)
- .add(PubsubOptions.class)
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
deleted file mode 100644
index d01406f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.auth.Credentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
-import java.util.Locale;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.CredentialFactory;
-import org.apache.beam.sdk.util.DefaultBucket;
-import org.apache.beam.sdk.util.GcpCredentialFactory;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Options used to configure Google Cloud Platform specific options such as the project
- * and credentials.
- *
- * <p>These options defer to the
- * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
- * application default credentials</a> for authentication. See the
- * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for
- * alternative mechanisms for creating credentials.
- */
-@Description("Options used to configure Google Cloud Platform project and credentials.")
-public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
- /**
- * Project id to use when launching jobs.
- */
- @Description("Project id. Required when using Google Cloud Platform services. "
- + "See https://cloud.google.com/storage/docs/projects for further details.")
- @Default.InstanceFactory(DefaultProjectFactory.class)
- String getProject();
- void setProject(String value);
-
- /**
- * GCP <a href="https://developers.google.com/compute/docs/zones"
- * >availability zone</a> for operations.
- *
- * <p>Default is set on a per-service basis.
- */
- @Description("GCP availability zone for running GCP operations. "
- + "Default is up to the individual service.")
- String getZone();
- void setZone(String value);
-
- /**
- * The class of the credential factory that should be created and used to create
- * credentials. If gcpCredential has not been set explicitly, an instance of this class will
- * be constructed and used as a credential factory.
- */
- @Description("The class of the credential factory that should be created and used to create "
- + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
- + "be constructed and used as a credential factory.")
- @Default.Class(GcpCredentialFactory.class)
- Class<? extends CredentialFactory> getCredentialFactoryClass();
- void setCredentialFactoryClass(
- Class<? extends CredentialFactory> credentialFactoryClass);
-
- /**
- * The credential instance that should be used to authenticate against GCP services.
- * If no credential has been set explicitly, the default is to use the instance factory
- * that constructs a credential based upon the currently set credentialFactoryClass.
- */
- @JsonIgnore
- @Description("The credential instance that should be used to authenticate against GCP services. "
- + "If no credential has been set explicitly, the default is to use the instance factory "
- + "that constructs a credential based upon the currently set credentialFactoryClass.")
- @Default.InstanceFactory(GcpUserCredentialsFactory.class)
- Credentials getGcpCredential();
- void setGcpCredential(Credentials value);
-
- /**
- * Attempts to infer the default project based upon the environment this application
- * is executing within. Currently this only supports getting the default project from gcloud.
- */
- class DefaultProjectFactory implements DefaultValueFactory<String> {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
-
- @Override
- public String create(PipelineOptions options) {
- try {
- File configFile;
- if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
- configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
- } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
- configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
- } else {
- // New versions of gcloud use this file
- configFile = new File(
- System.getProperty("user.home"),
- ".config/gcloud/configurations/config_default");
- if (!configFile.exists()) {
- // Old versions of gcloud use this file
- configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
- }
- }
- String section = null;
- Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
- Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
- for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
- line = line.trim();
- if (line.isEmpty() || line.startsWith(";")) {
- continue;
- }
- Matcher matcher = sectionPattern.matcher(line);
- if (matcher.matches()) {
- section = matcher.group(1);
- } else if (section == null || section.equals("core")) {
- matcher = projectPattern.matcher(line);
- if (matcher.matches()) {
- String project = matcher.group(1).trim();
- LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
- + "project, please cancel this Pipeline and specify the command-line "
- + "argument --project.", project);
- return project;
- }
- }
- }
- } catch (IOException expected) {
- LOG.debug("Failed to find default project.", expected);
- }
- // return null if can't determine
- return null;
- }
-
- /**
- * Returns true if running on the Windows OS.
- */
- private static boolean isWindows() {
- return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
- }
-
- /**
- * Used to mock out getting environment variables.
- */
- @VisibleForTesting
- Map<String, String> getEnvironment() {
- return System.getenv();
- }
- }
-
- /**
- * Attempts to load the GCP credentials. See
- * {@link CredentialFactory#getCredential()} for more details.
- */
- class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> {
- @Override
- public Credentials create(PipelineOptions options) {
- GcpOptions gcpOptions = options.as(GcpOptions.class);
- try {
- CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
- .fromClass(gcpOptions.getCredentialFactoryClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- return factory.getCredential();
- } catch (IOException | GeneralSecurityException e) {
- throw new RuntimeException("Unable to obtain credential", e);
- }
- }
- }
-
- /**
- * A GCS path for storing temporary files in GCP.
- *
- * <p>Its default to {@link PipelineOptions#getTempLocation}.
- */
- @Description("A GCS path for storing temporary files in GCP.")
- @Default.InstanceFactory(GcpTempLocationFactory.class)
- @Nullable String getGcpTempLocation();
- void setGcpTempLocation(String value);
-
- /**
- * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
- */
- class GcpTempLocationFactory implements DefaultValueFactory<String> {
-
- @Override
- @Nullable
- public String create(PipelineOptions options) {
- String tempLocation = options.getTempLocation();
- if (isNullOrEmpty(tempLocation)) {
- tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
- options.setTempLocation(tempLocation);
- } else {
- try {
- PathValidator validator = options.as(GcsOptions.class).getPathValidator();
- validator.validateOutputFilePrefixSupported(tempLocation);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format(
- "Error constructing default value for gcpTempLocation: tempLocation is not"
- + " a valid GCS path, %s. ", tempLocation), e);
- }
- }
- return tempLocation;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
deleted file mode 100644
index 2187e7d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.AppEngineEnvironment;
-import org.apache.beam.sdk.util.GcsPathValidator;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
-
-/**
- * Options used to configure Google Cloud Storage.
- */
-public interface GcsOptions extends
- ApplicationNameOptions, GcpOptions, PipelineOptions {
- /**
- * The GcsUtil instance that should be used to communicate with Google Cloud Storage.
- */
- @JsonIgnore
- @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.")
- @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class)
- @Hidden
- GcsUtil getGcsUtil();
- void setGcsUtil(GcsUtil value);
-
- /**
- * The ExecutorService instance to use to create threads, can be overridden to specify an
- * ExecutorService that is compatible with the users environment. If unset, the
- * default is to create an ExecutorService with an unbounded number of threads; this
- * is compatible with Google AppEngine.
- */
- @JsonIgnore
- @Description("The ExecutorService instance to use to create multiple threads. Can be overridden "
- + "to specify an ExecutorService that is compatible with the users environment. If unset, "
- + "the default is to create an ExecutorService with an unbounded number of threads; this "
- + "is compatible with Google AppEngine.")
- @Default.InstanceFactory(ExecutorServiceFactory.class)
- @Hidden
- ExecutorService getExecutorService();
- void setExecutorService(ExecutorService value);
-
- /**
- * GCS endpoint to use. If unspecified, uses the default endpoint.
- */
- @JsonIgnore
- @Hidden
- @Description("The URL for the GCS API.")
- String getGcsEndpoint();
- void setGcsEndpoint(String value);
-
- /**
- * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
- * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
- * restrictions and performance implications of this value.
- */
- @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the "
- + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
- + "information on the restrictions and performance implications of this value.\n\n"
- + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
- + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
- @Nullable
- Integer getGcsUploadBufferSizeBytes();
- void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
-
- /**
- * The class of the validator that should be created and used to validate paths.
- * If pathValidator has not been set explicitly, an instance of this class will be
- * constructed and used as the path validator.
- */
- @Description("The class of the validator that should be created and used to validate paths. "
- + "If pathValidator has not been set explicitly, an instance of this class will be "
- + "constructed and used as the path validator.")
- @Default.Class(GcsPathValidator.class)
- Class<? extends PathValidator> getPathValidatorClass();
- void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
- /**
- * The path validator instance that should be used to validate paths.
- * If no path validator has been set explicitly, the default is to use the instance factory that
- * constructs a path validator based upon the currently set pathValidatorClass.
- */
- @JsonIgnore
- @Description("The path validator instance that should be used to validate paths. "
- + "If no path validator has been set explicitly, the default is to use the instance factory "
- + "that constructs a path validator based upon the currently set pathValidatorClass.")
- @Default.InstanceFactory(PathValidatorFactory.class)
- PathValidator getPathValidator();
- void setPathValidator(PathValidator validator);
-
- /**
- * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The
- * {@link ExecutorService} is compatible with AppEngine.
- */
- class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
- @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only.
- @Override
- public ExecutorService create(PipelineOptions options) {
- ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
- threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
- if (!AppEngineEnvironment.IS_APP_ENGINE) {
- // AppEngine doesn't allow modification of threads to be daemon threads.
- threadFactoryBuilder.setDaemon(true);
- }
- /* The SDK requires an unbounded thread pool because a step may create X writers
- * each requiring their own thread to perform the writes otherwise a writer may
- * block causing deadlock for the step because the writers buffer is full.
- * Also, the MapTaskExecutor launches the steps in reverse order and completes
- * them in forward order thus requiring enough threads so that each step's writers
- * can be active.
- */
- return new ThreadPoolExecutor(
- 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
- Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
- new SynchronousQueue<Runnable>(),
- threadFactoryBuilder.build());
- }
- }
-
- /**
- * Creates a {@link PathValidator} object using the class specified in
- * {@link #getPathValidatorClass()}.
- */
- class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
- @Override
- public PathValidator create(PipelineOptions options) {
- GcsOptions gcsOptions = options.as(GcsOptions.class);
- return InstanceBuilder.ofType(PathValidator.class)
- .fromClass(gcsOptions.getPathValidatorClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
deleted file mode 100644
index f9cb575..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.google.api.client.googleapis.services.AbstractGoogleClient;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.googleapis.services.GoogleClientRequestInitializer;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * These options configure debug settings for Google API clients created within the Apache Beam SDK.
- */
-public interface GoogleApiDebugOptions extends PipelineOptions {
- /**
- * This option enables tracing of API calls to Google services used within the Apache
- * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...}
- * </code> where the {@code ApiName} represents the request classes canonical name. The
- * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported.
- * Typically, "producer" is the right destination to use: this makes API traces available to the
- * team offering the API. Note that by enabling this option, the contents of the requests to and
- * from Google Cloud services will be made available to Google. For example, by specifying
- * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available
- * to Google, specifically to the Google Cloud Dataflow team.
- */
- @Description("This option enables tracing of API calls to Google services used within the Apache "
- + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} "
- + "where the ApiName represents the request classes canonical name. The TraceDestination is "
- + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is "
- + "the right destination to use: this makes API traces available to the team offering the "
- + "API. Note that by enabling this option, the contents of the requests to and from "
- + "Google Cloud services will be made available to Google. For example, by specifying "
- + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to "
- + "Google, specifically to the Google Cloud Dataflow team.")
- GoogleApiTracer getGoogleApiTrace();
- void setGoogleApiTrace(GoogleApiTracer commands);
-
- /**
- * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls.
- */
- class GoogleApiTracer extends HashMap<String, String>
- implements GoogleClientRequestInitializer {
- /**
- * Creates a {@link GoogleApiTracer} that sets the trace destination on all
- * calls that match the given client type.
- */
- public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) {
- put(client.getClass().getCanonicalName(), traceDestination);
- return this;
- }
-
- /**
- * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all
- * calls that match for the given request type.
- */
- public GoogleApiTracer addTraceFor(
- AbstractGoogleClientRequest<?> request, String traceDestination) {
- put(request.getClass().getCanonicalName(), traceDestination);
- return this;
- }
-
- @Override
- public void initialize(AbstractGoogleClientRequest<?> request) throws IOException {
- for (Map.Entry<String, String> entry : this.entrySet()) {
- if (request.getClass().getCanonicalName().contains(entry.getKey())) {
- request.set("$trace", entry.getValue());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 4e7bc89..88d6576 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -187,8 +186,7 @@ import org.joda.time.format.DateTimeFormatter;
* <a href="https://github.com/FasterXML/jackson-annotations">annotations</a> to aid in
* serialization of custom types. We point you to the public
* <a href="https://github.com/FasterXML/jackson">Jackson documentation</a> when attempting
- * to add serialization support for your custom types. See {@link GoogleApiTracer} for an
- * example using the Jackson annotations to serialize and deserialize a custom type.
+ * to add serialization support for your custom types.
*
* <p>Note: It is an error to have the same property available in multiple interfaces with only
* some of them being annotated with {@link JsonIgnore @JsonIgnore}. It is also an error to mark a
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
deleted file mode 100644
index b065d19..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-/**
- * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK.
- */
-@Description("Options that are used to configure Google Cloud Pub/Sub. See "
- + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.")
-public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
-
- /**
- * Root URL for use with the Google Cloud Pub/Sub API.
- */
- @Description("Root URL for use with the Google Cloud Pub/Sub API")
- @Default.String("https://pubsub.googleapis.com")
- @Hidden
- String getPubsubRootUrl();
- void setPubsubRootUrl(String value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 7b2fba3..a318dfc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
@@ -41,6 +42,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
*/
public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) {
checkNotNull(options);
+ PipelineOptionsValidator.validate(PipelineOptions.class, options);
// (Re-)register standard IO factories. Clobbers any prior credentials.
IOChannelUtils.registerIOFactoriesAllowOverride(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
deleted file mode 100644
index 8f752c0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.BigqueryScopes;
-import com.google.api.services.bigquery.model.QueryRequest;
-import com.google.api.services.bigquery.model.QueryResponse;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Transport;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A matcher to verify data in BigQuery by processing given query
- * and comparing with content's checksum.
- *
- * <p>Example:
- * <pre>{@code [
- * assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum));
- * ]}</pre>
- */
-@NotThreadSafe
-public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
- implements SerializableMatcher<PipelineResult> {
- private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class);
-
- // The maximum number of retries to execute a BigQuery RPC
- static final int MAX_QUERY_RETRIES = 4;
-
- // The initial backoff for executing a BigQuery RPC
- private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L);
-
- // The total number of rows in query response to be formatted for debugging purpose
- private static final int TOTAL_FORMATTED_ROWS = 20;
-
- // The backoff factory with initial configs
- static final FluentBackoff BACKOFF_FACTORY =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_QUERY_RETRIES)
- .withInitialBackoff(INITIAL_BACKOFF);
-
- private final String applicationName;
- private final String projectId;
- private final String query;
- private final String expectedChecksum;
- private String actualChecksum;
- private transient QueryResponse response;
-
- public BigqueryMatcher(
- String applicationName, String projectId, String query, String expectedChecksum) {
- validateArgument("applicationName", applicationName);
- validateArgument("projectId", projectId);
- validateArgument("query", query);
- validateArgument("expectedChecksum", expectedChecksum);
-
- this.applicationName = applicationName;
- this.projectId = projectId;
- this.query = query;
- this.expectedChecksum = expectedChecksum;
- }
-
- @Override
- protected boolean matchesSafely(PipelineResult pipelineResult) {
- LOG.info("Verifying Bigquery data");
- Bigquery bigqueryClient = newBigqueryClient(applicationName);
-
- // execute query
- LOG.debug("Executing query: {}", query);
- try {
- QueryRequest queryContent = new QueryRequest();
- queryContent.setQuery(query);
-
- response = queryWithRetries(
- bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedIOException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException("Failed to fetch BigQuery data.", e);
- }
-
- if (!response.getJobComplete()) {
- // query job not complete, verification failed
- return false;
- } else {
- // compute checksum
- actualChecksum = generateHash(response.getRows());
- LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
-
- return expectedChecksum.equals(actualChecksum);
- }
- }
-
- @VisibleForTesting
- Bigquery newBigqueryClient(String applicationName) {
- HttpTransport transport = Transport.getTransport();
- JsonFactory jsonFactory = Transport.getJsonFactory();
- Credentials credential = getDefaultCredential();
-
- return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential))
- .setApplicationName(applicationName)
- .build();
- }
-
- @Nonnull
- @VisibleForTesting
- QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
- Sleeper sleeper, BackOff backOff)
- throws IOException, InterruptedException {
- IOException lastException = null;
- do {
- if (lastException != null) {
- LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
- }
- try {
- QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
- if (response != null) {
- return response;
- } else {
- lastException =
- new IOException("Expected valid response from query job, but received null.");
- }
- } catch (IOException e) {
- // ignore and retry
- lastException = e;
- }
- } while(BackOffUtils.next(sleeper, backOff));
-
- throw new RuntimeException(
- String.format(
- "Unable to get BigQuery response after retrying %d times using query (%s)",
- MAX_QUERY_RETRIES,
- queryContent.getQuery()),
- lastException);
- }
-
- private void validateArgument(String name, String value) {
- checkArgument(
- !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value);
- }
-
- private Credentials getDefaultCredential() {
- GoogleCredentials credential;
- try {
- credential = GoogleCredentials.getApplicationDefault();
- } catch (IOException e) {
- throw new RuntimeException("Failed to get application default credential.", e);
- }
-
- if (credential.createScopedRequired()) {
- Collection<String> bigqueryScope =
- Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY);
- credential = credential.createScoped(bigqueryScope);
- }
- return credential;
- }
-
- private String generateHash(@Nonnull List<TableRow> rows) {
- List<HashCode> rowHashes = Lists.newArrayList();
- for (TableRow row : rows) {
- List<String> cellsInOneRow = Lists.newArrayList();
- for (TableCell cell : row.getF()) {
- cellsInOneRow.add(Objects.toString(cell.getV()));
- Collections.sort(cellsInOneRow);
- }
- rowHashes.add(
- Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8));
- }
- return Hashing.combineUnordered(rowHashes).toString();
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("Expected checksum is (")
- .appendText(expectedChecksum)
- .appendText(")");
- }
-
- @Override
- public void describeMismatchSafely(PipelineResult pResult, Description description) {
- String info;
- if (!response.getJobComplete()) {
- // query job not complete
- info = String.format("The query job hasn't completed. Got response: %s", response);
- } else {
- // checksum mismatch
- info = String.format("was (%s).%n"
- + "\tTotal number of rows are: %d.%n"
- + "\tQueried data details:%s",
- actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS));
- }
- description.appendText(info);
- }
-
- private String formatRows(int totalNumRows) {
- StringBuilder samples = new StringBuilder();
- List<TableRow> rows = response.getRows();
- for (int i = 0; i < totalNumRows && i < rows.size(); i++) {
- samples.append(String.format("%n\t\t"));
- for (TableCell field : rows.get(i).getF()) {
- samples.append(String.format("%-10s", field.getV()));
- }
- }
- if (rows.size() > totalNumRows) {
- samples.append(String.format("%n\t\t..."));
- }
- return samples.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 3d3de51..1273442 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -42,14 +42,12 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.TestCredential;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
@@ -400,7 +398,6 @@ public class TestPipeline extends Pipeline implements TestRule {
if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) {
options.setRunner(CrashingRunner.class);
}
- options.as(GcpOptions.class).setGcpCredential(new TestCredential());
}
options.setStableUniqueNames(CheckEnabled.ERROR);
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
deleted file mode 100644
index b0fcbd1..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.lang.reflect.InvocationTargetException;
-
-/** Stores whether we are running within AppEngine or not. */
-public class AppEngineEnvironment {
- /**
- * True if running inside of AppEngine, false otherwise.
- */
- @Deprecated
- public static final boolean IS_APP_ENGINE = isAppEngine();
-
- /**
- * Attempts to detect whether we are inside of AppEngine.
- *
- * <p>Purposely copied and left private from private <a href="https://code.google.com/p/
- * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/
- * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>.
- *
- * @return true if we are inside of AppEngine, false otherwise.
- */
- static boolean isAppEngine() {
- if (System.getProperty("com.google.appengine.runtime.environment") == null) {
- return false;
- }
- try {
- // If the current environment is null, we're not inside AppEngine.
- return Class.forName("com.google.apphosting.api.ApiProxy")
- .getMethod("getCurrentEnvironment")
- .invoke(null) != null;
- } catch (ClassNotFoundException e) {
- // If ApiProxy doesn't exist, we're not on AppEngine at all.
- return false;
- } catch (InvocationTargetException e) {
- // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
- return false;
- } catch (IllegalAccessException e) {
- // If the method isn't accessible, we're not on a supported version of AppEngine;
- return false;
- } catch (NoSuchMethodException e) {
- // If the method doesn't exist, we're not on a supported version of AppEngine;
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
deleted file mode 100644
index 6229650..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.auth.Credentials;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- */
-public interface CredentialFactory {
- Credentials getCredential() throws IOException, GeneralSecurityException;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
deleted file mode 100644
index 75954c0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.google.api.services.storage.model.Bucket;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class for handling default GCS buckets.
- */
-public class DefaultBucket {
- static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
-
- static final String DEFAULT_REGION = "us-central1";
-
- /**
- * Creates a default bucket or verifies the existence and proper access control
- * of an existing default bucket. Returns the location if successful.
- */
- public static String tryCreateDefaultBucket(PipelineOptions options) {
- GcsOptions gcpOptions = options.as(GcsOptions.class);
-
- final String projectId = gcpOptions.getProject();
- checkArgument(!isNullOrEmpty(projectId),
- "--project is a required option.");
-
- // Look up the project number, to create a default bucket with a stable
- // name with no special characters.
- long projectNumber = 0L;
- try {
- projectNumber = gcpOptions.as(CloudResourceManagerOptions.class)
- .getGcpProjectUtil().getProjectNumber(projectId);
- } catch (IOException e) {
- throw new RuntimeException("Unable to verify project with ID " + projectId, e);
- }
- String region = DEFAULT_REGION;
- if (!isNullOrEmpty(gcpOptions.getZone())) {
- region = getRegionFromZone(gcpOptions.getZone());
- }
- final String bucketName =
- "dataflow-staging-" + region + "-" + projectNumber;
- LOG.info("No staging location provided, attempting to use default bucket: {}",
- bucketName);
- Bucket bucket = new Bucket()
- .setName(bucketName)
- .setLocation(region);
- // Always try to create the bucket before checking access, so that we do not
- // race with other pipelines that may be attempting to do the same thing.
- try {
- gcpOptions.getGcsUtil().createBucket(projectId, bucket);
- } catch (FileAlreadyExistsException e) {
- LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
- } catch (IOException e) {
- throw new RuntimeException("Unable create default bucket.", e);
- }
-
- // Once the bucket is expected to exist, verify that it is correctly owned
- // by the project executing the job.
- try {
- long owner = gcpOptions.getGcsUtil().bucketOwner(
- GcsPath.fromComponents(bucketName, ""));
- checkArgument(
- owner == projectNumber,
- "Bucket owner does not match the project from --project:"
- + " %s vs. %s", owner, projectNumber);
- } catch (IOException e) {
- throw new RuntimeException(
- "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
- }
- return "gs://" + bucketName;
- }
-
- @VisibleForTesting
- static String getRegionFromZone(String zone) {
- String[] zoneParts = zone.split("-");
- checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
- return zoneParts[0] + "-" + zoneParts[1];
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
deleted file mode 100644
index e1fa18f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.auth.Credentials;
-import com.google.auth.oauth2.GoogleCredentials;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- * Returns a GCP credential.
- */
-public class GcpCredentialFactory implements CredentialFactory {
- /**
- * The scope cloud-platform provides access to all Cloud Platform resources.
- * cloud-platform isn't sufficient yet for talking to datastore so we request
- * those resources separately.
- *
- * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for
- * services we access directly (GCS) as opposed to through the backend
- * (BigQuery, GCE), we need to explicitly request that scope.
- */
- private static final List<String> SCOPES = Arrays.asList(
- "https://www.googleapis.com/auth/cloud-platform",
- "https://www.googleapis.com/auth/devstorage.full_control",
- "https://www.googleapis.com/auth/userinfo.email",
- "https://www.googleapis.com/auth/datastore",
- "https://www.googleapis.com/auth/pubsub");
-
- private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory();
-
- public static GcpCredentialFactory fromOptions(PipelineOptions options) {
- return INSTANCE;
- }
-
- /**
- * Returns a default GCP {@link Credentials} or null when it fails.
- */
- @Override
- public Credentials getCredential() {
- try {
- return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
- } catch (IOException e) {
- // Ignore the exception
- // Pipelines that only access to public data should be able to run without credentials.
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
deleted file mode 100644
index f73afe0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.cloudresourcemanager.model.Project;
-import com.google.cloud.hadoop.util.ResilientOperation;
-import com.google.cloud.hadoop.util.RetryDeterminer;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides operations on Google Cloud Platform Projects.
- */
-public class GcpProjectUtil {
- /**
- * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using
- * any transport flags specified on the {@link PipelineOptions}.
- */
- public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> {
- /**
- * Returns an instance of {@link GcpProjectUtil} based on the
- * {@link PipelineOptions}.
- */
- @Override
- public GcpProjectUtil create(PipelineOptions options) {
- LOG.debug("Creating new GcpProjectUtil");
- CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class);
- return new GcpProjectUtil(
- Transport.newCloudResourceManagerClient(crmOptions).build());
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class);
-
- private static final FluentBackoff BACKOFF_FACTORY =
- FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
-
- /** Client for the CRM API. */
- private CloudResourceManager crmClient;
-
- private GcpProjectUtil(CloudResourceManager crmClient) {
- this.crmClient = crmClient;
- }
-
- // Use this only for testing purposes.
- @VisibleForTesting
- void setCrmClient(CloudResourceManager crmClient) {
- this.crmClient = crmClient;
- }
-
- /**
- * Returns the project number or throws an exception if the project does not
- * exist or has other access exceptions.
- */
- public long getProjectNumber(String projectId) throws IOException {
- return getProjectNumber(
- projectId,
- BACKOFF_FACTORY.backoff(),
- Sleeper.DEFAULT);
- }
-
- /**
- * Returns the project number or throws an error if the project does not
- * exist or has other access errors.
- */
- @VisibleForTesting
- long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException {
- CloudResourceManager.Projects.Get getProject =
- crmClient.projects().get(projectId);
- try {
- Project project = ResilientOperation.retry(
- ResilientOperation.getGoogleRequestCallable(getProject),
- backoff,
- RetryDeterminer.SOCKET_ERRORS,
- IOException.class,
- sleeper);
- return project.getProjectNumber();
- } catch (Exception e) {
- throw new IOException("Unable to get project number", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
deleted file mode 100644
index 745dcb9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * Implements IOChannelFactory for GCS.
- */
-public class GcsIOChannelFactory implements IOChannelFactory {
-
- /**
- * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}.
- */
- public static GcsIOChannelFactory fromOptions(PipelineOptions options) {
- return new GcsIOChannelFactory(options.as(GcsOptions.class));
- }
-
- private final GcsOptions options;
-
- private GcsIOChannelFactory(GcsOptions options) {
- this.options = options;
- }
-
- @Override
- public Collection<String> match(String spec) throws IOException {
- GcsPath path = GcsPath.fromUri(spec);
- GcsUtil util = options.getGcsUtil();
- List<GcsPath> matched = util.expand(path);
-
- List<String> specs = new LinkedList<>();
- for (GcsPath match : matched) {
- specs.add(match.toString());
- }
-
- return specs;
- }
-
- @Override
- public ReadableByteChannel open(String spec) throws IOException {
- GcsPath path = GcsPath.fromUri(spec);
- GcsUtil util = options.getGcsUtil();
- return util.open(path);
- }
-
- @Override
- public WritableByteChannel create(String spec, String mimeType)
- throws IOException {
- GcsPath path = GcsPath.fromUri(spec);
- GcsUtil util = options.getGcsUtil();
- return util.create(path, mimeType);
- }
-
- @Override
- public long getSizeBytes(String spec) throws IOException {
- GcsPath path = GcsPath.fromUri(spec);
- GcsUtil util = options.getGcsUtil();
- return util.fileSize(path);
- }
-
- @Override
- public boolean isReadSeekEfficient(String spec) throws IOException {
- // TODO It is incorrect to return true here for files with content encoding set to gzip.
- return true;
- }
-
- @Override
- public String resolve(String path, String other) throws IOException {
- return toPath(path).resolve(other).toString();
- }
-
- @Override
- public Path toPath(String path) {
- return GcsPath.fromUri(path);
- }
-
- @Override
- public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
- throws IOException {
- options.getGcsUtil().copy(srcFilenames, destFilenames);
- }
-
- @Override
- public void remove(Collection<String> filesOrDirs) throws IOException {
- options.getGcsUtil().remove(filesOrDirs);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
deleted file mode 100644
index b4c457f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * {@link AutoService} registrar for the {@link GcsIOChannelFactory}.
- */
-@AutoService(IOChannelFactoryRegistrar.class)
-public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar {
-
- @Override
- public GcsIOChannelFactory fromOptions(PipelineOptions options) {
- return GcsIOChannelFactory.fromOptions(options);
- }
-
- @Override
- public String getScheme() {
- return "gs";
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
deleted file mode 100644
index a5b951d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class GcsPathValidator implements PathValidator {
-
- private GcsOptions gcpOptions;
-
- private GcsPathValidator(GcsOptions options) {
- this.gcpOptions = options;
- }
-
- public static GcsPathValidator fromOptions(PipelineOptions options) {
- return new GcsPathValidator(options.as(GcsOptions.class));
- }
-
- /**
- * Validates the the input GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateInputFilePatternSupported(String filepattern) {
- GcsPath gcsPath = getGcsPath(filepattern);
- checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
- String returnValue = verifyPath(filepattern);
- verifyPathIsAccessible(filepattern, "Could not find file %s");
- return returnValue;
- }
-
- /**
- * Validates the the output GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- String returnValue = verifyPath(filePrefix);
- verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- return returnValue;
- }
-
- @Override
- public String verifyPath(String path) {
- GcsPath gcsPath = getGcsPath(path);
- checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
- checkArgument(!gcsPath.getObject().contains("//"),
- "Dataflow Service does not allow objects with consecutive slashes");
- return gcsPath.toResourceName();
- }
-
- private void verifyPathIsAccessible(String path, String errorMessage) {
- GcsPath gcsPath = getGcsPath(path);
- try {
- checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
- errorMessage, path);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
- }
- }
-
- private GcsPath getGcsPath(String path) {
- try {
- return GcsPath.fromUri(path);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "Expected a valid 'gs://' path but was given '%s'", path), e);
- }
- }
-}