You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/02 17:57:08 UTC
[3/7] beam git commit: [BEAM-2135] Move gcp-core to
google-cloud-platform-core
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java
new file mode 100644
index 0000000..4355a10
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ * Always returns a null Credential object.
+ */
+public class NoopCredentialFactory implements CredentialFactory {
+ private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory();
+ private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials();
+
+ public static NoopCredentialFactory fromOptions(PipelineOptions options) {
+ return INSTANCE;
+ }
+
+ @Override
+ public Credentials getCredential() throws IOException {
+ return NOOP_CREDENTIALS;
+ }
+
+ private static class NoopCredentials extends Credentials {
+ @Override
+ public String getAuthenticationType() {
+ return null;
+ }
+
+ @Override
+ public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean hasRequestMetadata() {
+ return false;
+ }
+
+ @Override
+ public boolean hasRequestMetadataOnly() {
+ return false;
+ }
+
+ @Override
+ public void refresh() throws IOException {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java
new file mode 100644
index 0000000..00306f2
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import java.io.IOException;
+
+/**
+ * A {@link HttpRequestInitializer} for requests that don't have credentials.
+ *
+ * <p>When the access is denied, it throws {@link IOException} with a detailed error message.
+ */
+public class NullCredentialInitializer implements HttpRequestInitializer {
+ private static final int ACCESS_DENIED = 401;
+ private static final String NULL_CREDENTIAL_REASON =
+ "Unable to get application default credentials. Please see "
+ + "https://developers.google.com/accounts/docs/application-default-credentials "
+ + "for details on how to specify credentials. This version of the SDK is "
+ + "dependent on the gcloud core component version 2015.02.05 or newer to "
+ + "be able to get credentials from the currently authorized user via gcloud auth.";
+
+ @Override
+ public void initialize(HttpRequest httpRequest) throws IOException {
+ httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler());
+ }
+
+ private static class NullCredentialHttpUnsuccessfulResponseHandler
+ implements HttpUnsuccessfulResponseHandler {
+
+ @Override
+ public boolean handleResponse(
+ HttpRequest httpRequest,
+ HttpResponse httpResponse, boolean supportsRetry) throws IOException {
+ if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) {
+ throwNullCredentialException();
+ }
+ return supportsRetry;
+ }
+ }
+
+ public static void throwNullCredentialException() {
+ throw new RuntimeException(NULL_CREDENTIAL_REASON);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java
new file mode 100644
index 0000000..3d77bf2
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines classes related to interacting with {@link com.google.auth.Credentials} for
+ * pipeline creation and execution containing Google Cloud Platform components.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
new file mode 100644
index 0000000..87557e5
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/**
+ * Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google CloudResourceManager. See "
+ + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
+public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
+ PipelineOptions, StreamingOptions {
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
new file mode 100644
index 0000000..445f00f
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.api.services.storage.model.Bucket;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
+import java.security.GeneralSecurityException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Options used to configure Google Cloud Platform specific options such as the project
+ * and credentials.
+ *
+ * <p>These options defer to the
+ * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
+ * application default credentials</a> for authentication. See the
+ * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for
+ * alternative mechanisms for creating credentials.
+ */
+@Description("Options used to configure Google Cloud Platform project and credentials.")
+public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
+ /**
+ * Project id to use when launching jobs.
+ */
+ @Description("Project id. Required when using Google Cloud Platform services. "
+ + "See https://cloud.google.com/storage/docs/projects for further details.")
+ @Default.InstanceFactory(DefaultProjectFactory.class)
+ String getProject();
+ void setProject(String value);
+
+ /**
+ * GCP <a href="https://developers.google.com/compute/docs/zones"
+ * >availability zone</a> for operations.
+ *
+ * <p>Default is set on a per-service basis.
+ */
+ @Description("GCP availability zone for running GCP operations. "
+ + "Default is up to the individual service.")
+ String getZone();
+ void setZone(String value);
+
+ /**
+ * The class of the credential factory that should be created and used to create
+ * credentials. If gcpCredential has not been set explicitly, an instance of this class will
+ * be constructed and used as a credential factory.
+ */
+ @Description("The class of the credential factory that should be created and used to create "
+ + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
+ + "be constructed and used as a credential factory.")
+ @Default.Class(GcpCredentialFactory.class)
+ Class<? extends CredentialFactory> getCredentialFactoryClass();
+ void setCredentialFactoryClass(
+ Class<? extends CredentialFactory> credentialFactoryClass);
+
+ /**
+ * The credential instance that should be used to authenticate against GCP services.
+ * If no credential has been set explicitly, the default is to use the instance factory
+ * that constructs a credential based upon the currently set credentialFactoryClass.
+ */
+ @JsonIgnore
+ @Description("The credential instance that should be used to authenticate against GCP services. "
+ + "If no credential has been set explicitly, the default is to use the instance factory "
+ + "that constructs a credential based upon the currently set credentialFactoryClass.")
+ @Default.InstanceFactory(GcpUserCredentialsFactory.class)
+ Credentials getGcpCredential();
+ void setGcpCredential(Credentials value);
+
+ /**
+ * Attempts to infer the default project based upon the environment this application
+ * is executing within. Currently this only supports getting the default project from gcloud.
+ */
+ class DefaultProjectFactory implements DefaultValueFactory<String> {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
+
+ @Override
+ public String create(PipelineOptions options) {
+ try {
+ File configFile;
+ if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
+ configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
+ } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
+ configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
+ } else {
+ // New versions of gcloud use this file
+ configFile = new File(
+ System.getProperty("user.home"),
+ ".config/gcloud/configurations/config_default");
+ if (!configFile.exists()) {
+ // Old versions of gcloud use this file
+ configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
+ }
+ }
+ String section = null;
+ Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
+ Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
+ for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
+ line = line.trim();
+ if (line.isEmpty() || line.startsWith(";")) {
+ continue;
+ }
+ Matcher matcher = sectionPattern.matcher(line);
+ if (matcher.matches()) {
+ section = matcher.group(1);
+ } else if (section == null || section.equals("core")) {
+ matcher = projectPattern.matcher(line);
+ if (matcher.matches()) {
+ String project = matcher.group(1).trim();
+ LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
+ + "project, please cancel this Pipeline and specify the command-line "
+ + "argument --project.", project);
+ return project;
+ }
+ }
+ }
+ } catch (IOException expected) {
+ LOG.debug("Failed to find default project.", expected);
+ }
+ // return null if can't determine
+ return null;
+ }
+
+ /**
+ * Returns true if running on the Windows OS.
+ */
+ private static boolean isWindows() {
+ return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
+ }
+
+ /**
+ * Used to mock out getting environment variables.
+ */
+ @VisibleForTesting
+ Map<String, String> getEnvironment() {
+ return System.getenv();
+ }
+ }
+
+ /**
+ * Attempts to load the GCP credentials. See
+ * {@link CredentialFactory#getCredential()} for more details.
+ */
+ class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> {
+ @Override
+ public Credentials create(PipelineOptions options) {
+ GcpOptions gcpOptions = options.as(GcpOptions.class);
+ try {
+ CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
+ .fromClass(gcpOptions.getCredentialFactoryClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ return factory.getCredential();
+ } catch (IOException | GeneralSecurityException e) {
+ throw new RuntimeException("Unable to obtain credential", e);
+ }
+ }
+ }
+
+ /**
+ * A GCS path for storing temporary files in GCP.
+ *
+ * <p>Its default to {@link PipelineOptions#getTempLocation}.
+ */
+ @Description("A GCS path for storing temporary files in GCP.")
+ @Default.InstanceFactory(GcpTempLocationFactory.class)
+ @Nullable String getGcpTempLocation();
+ void setGcpTempLocation(String value);
+
+ /**
+ * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
+ */
+ class GcpTempLocationFactory implements DefaultValueFactory<String> {
+ private static final FluentBackoff BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+ static final String DEFAULT_REGION = "us-central1";
+ static final Logger LOG = LoggerFactory.getLogger(GcpTempLocationFactory.class);
+
+ @Override
+ @Nullable
+ public String create(PipelineOptions options) {
+ String tempLocation = options.getTempLocation();
+ if (isNullOrEmpty(tempLocation)) {
+ tempLocation = tryCreateDefaultBucket(options,
+ newCloudResourceManagerClient(options.as(CloudResourceManagerOptions.class)).build());
+ options.setTempLocation(tempLocation);
+ } else {
+ try {
+ PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+ validator.validateOutputFilePrefixSupported(tempLocation);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path, %s. ", tempLocation), e);
+ }
+ }
+ return tempLocation;
+ }
+
+ /**
+ * Creates a default bucket or verifies the existence and proper access control
+ * of an existing default bucket. Returns the location if successful.
+ */
+ @VisibleForTesting
+ static String tryCreateDefaultBucket(
+ PipelineOptions options, CloudResourceManager crmClient) {
+ GcsOptions gcpOptions = options.as(GcsOptions.class);
+
+ final String projectId = gcpOptions.getProject();
+ checkArgument(!isNullOrEmpty(projectId),
+ "--project is a required option.");
+
+ // Look up the project number, to create a default bucket with a stable
+ // name with no special characters.
+ long projectNumber = 0L;
+ try {
+ projectNumber = getProjectNumber(projectId, crmClient);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to verify project with ID " + projectId, e);
+ }
+ String region = DEFAULT_REGION;
+ if (!isNullOrEmpty(gcpOptions.getZone())) {
+ region = getRegionFromZone(gcpOptions.getZone());
+ }
+ final String bucketName =
+ "dataflow-staging-" + region + "-" + projectNumber;
+ LOG.info("No staging location provided, attempting to use default bucket: {}",
+ bucketName);
+ Bucket bucket = new Bucket()
+ .setName(bucketName)
+ .setLocation(region);
+ // Always try to create the bucket before checking access, so that we do not
+ // race with other pipelines that may be attempting to do the same thing.
+ try {
+ gcpOptions.getGcsUtil().createBucket(projectId, bucket);
+ } catch (FileAlreadyExistsException e) {
+ LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable create default bucket.", e);
+ }
+
+ // Once the bucket is expected to exist, verify that it is correctly owned
+ // by the project executing the job.
+ try {
+ long owner = gcpOptions.getGcsUtil().bucketOwner(
+ GcsPath.fromComponents(bucketName, ""));
+ checkArgument(
+ owner == projectNumber,
+ "Bucket owner does not match the project from --project:"
+ + " %s vs. %s", owner, projectNumber);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
+ }
+ return "gs://" + bucketName;
+ }
+
+ /**
+ * Returns the project number or throws an exception if the project does not
+ * exist or has other access exceptions.
+ */
+ private static long getProjectNumber(
+ String projectId,
+ CloudResourceManager crmClient) throws IOException {
+ return getProjectNumber(
+ projectId,
+ crmClient,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
+ }
+
+ /**
+ * Returns the project number or throws an error if the project does not
+ * exist or has other access errors.
+ */
+ private static long getProjectNumber(
+ String projectId,
+ CloudResourceManager crmClient,
+ BackOff backoff,
+ Sleeper sleeper) throws IOException {
+ CloudResourceManager.Projects.Get getProject =
+ crmClient.projects().get(projectId);
+ try {
+ Project project = ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(getProject),
+ backoff,
+ RetryDeterminer.SOCKET_ERRORS,
+ IOException.class,
+ sleeper);
+ return project.getProjectNumber();
+ } catch (Exception e) {
+ throw new IOException("Unable to get project number", e);
+ }
+ }
+
+ @VisibleForTesting
+ static String getRegionFromZone(String zone) {
+ String[] zoneParts = zone.split("-");
+ checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
+ return zoneParts[0] + "-" + zoneParts[1];
+ }
+
+ /**
+ * Returns a CloudResourceManager client builder using the specified
+ * {@link CloudResourceManagerOptions}.
+ */
+ @VisibleForTesting
+ static CloudResourceManager.Builder newCloudResourceManagerClient(
+ CloudResourceManagerOptions options) {
+ Credentials credentials = options.getGcpCredential();
+ if (credentials == null) {
+ NullCredentialInitializer.throwNullCredentialException();
+ }
+ return new CloudResourceManager.Builder(Transport.getTransport(), Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ credentials,
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ private static HttpRequestInitializer chainHttpRequestInitializer(
+ Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ if (credential == null) {
+ return new ChainingHttpRequestInitializer(
+ new NullCredentialInitializer(), httpRequestInitializer);
+ } else {
+ return new ChainingHttpRequestInitializer(
+ new HttpCredentialsAdapter(credential),
+ httpRequestInitializer);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..afc3416
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * A registrar containing the default GCP options.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>builder()
+ .add(GcpOptions.class)
+ .add(GcsOptions.class)
+ .add(GoogleApiDebugOptions.class)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
new file mode 100644
index 0000000..954092c
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.GcsPathValidator;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+
+/**
+ * Options used to configure Google Cloud Storage.
+ */
+public interface GcsOptions extends
+ ApplicationNameOptions, GcpOptions, PipelineOptions {
+ /**
+ * The GcsUtil instance that should be used to communicate with Google Cloud Storage.
+ */
+ @JsonIgnore
+ @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.")
+ @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class)
+ @Hidden
+ GcsUtil getGcsUtil();
+ void setGcsUtil(GcsUtil value);
+
+ /**
+ * The ExecutorService instance to use to create threads, can be overridden to specify an
+ * ExecutorService that is compatible with the users environment. If unset, the
+ * default is to create an ExecutorService with an unbounded number of threads; this
+ * is compatible with Google AppEngine.
+ */
+ @JsonIgnore
+ @Description("The ExecutorService instance to use to create multiple threads. Can be overridden "
+ + "to specify an ExecutorService that is compatible with the users environment. If unset, "
+ + "the default is to create an ExecutorService with an unbounded number of threads; this "
+ + "is compatible with Google AppEngine.")
+ @Default.InstanceFactory(ExecutorServiceFactory.class)
+ @Hidden
+ ExecutorService getExecutorService();
+ void setExecutorService(ExecutorService value);
+
+ /**
+ * GCS endpoint to use. If unspecified, uses the default endpoint.
+ */
+ @JsonIgnore
+ @Hidden
+ @Description("The URL for the GCS API.")
+ String getGcsEndpoint();
+ void setGcsEndpoint(String value);
+
+ /**
+ * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
+ * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
+ * restrictions and performance implications of this value.
+ */
+ @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the "
+ + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
+ + "information on the restrictions and performance implications of this value.\n\n"
+ + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
+ + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
+ @Nullable
+ Integer getGcsUploadBufferSizeBytes();
+ void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
+
+ /**
+ * The class of the validator that should be created and used to validate paths.
+ * If pathValidator has not been set explicitly, an instance of this class will be
+ * constructed and used as the path validator.
+ */
+ @Description("The class of the validator that should be created and used to validate paths. "
+ + "If pathValidator has not been set explicitly, an instance of this class will be "
+ + "constructed and used as the path validator.")
+ @Default.Class(GcsPathValidator.class)
+ Class<? extends PathValidator> getPathValidatorClass();
+ void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+ /**
+ * The path validator instance that should be used to validate paths.
+ * If no path validator has been set explicitly, the default is to use the instance factory that
+ * constructs a path validator based upon the currently set pathValidatorClass.
+ */
+ @JsonIgnore
+ @Description("The path validator instance that should be used to validate paths. "
+ + "If no path validator has been set explicitly, the default is to use the instance factory "
+ + "that constructs a path validator based upon the currently set pathValidatorClass.")
+ @Default.InstanceFactory(PathValidatorFactory.class)
+ PathValidator getPathValidator();
+ void setPathValidator(PathValidator validator);
+
+ /**
+ * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The
+ * {@link ExecutorService} is compatible with AppEngine.
+ */
+ class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
+ @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only.
+ @Override
+ public ExecutorService create(PipelineOptions options) {
+ ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+ threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+ threadFactoryBuilder.setDaemon(true);
+ /* The SDK requires an unbounded thread pool because a step may create X writers
+ * each requiring their own thread to perform the writes otherwise a writer may
+ * block causing deadlock for the step because the writers buffer is full.
+ * Also, the MapTaskExecutor launches the steps in reverse order and completes
+ * them in forward order thus requiring enough threads so that each step's writers
+ * can be active.
+ */
+ return new ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
+ Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
+ new SynchronousQueue<Runnable>(),
+ threadFactoryBuilder.build());
+ }
+ }
+
+ /**
+ * Creates a {@link PathValidator} object using the class specified in
+ * {@link #getPathValidatorClass()}.
+ */
+ class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+ @Override
+ public PathValidator create(PipelineOptions options) {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ return InstanceBuilder.ofType(PathValidator.class)
+ .fromClass(gcsOptions.getPathValidatorClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java
new file mode 100644
index 0000000..01144c4
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import com.google.api.client.googleapis.services.AbstractGoogleClient;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.googleapis.services.GoogleClientRequestInitializer;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * These options configure debug settings for Google API clients created within the Apache Beam SDK.
+ */
+public interface GoogleApiDebugOptions extends PipelineOptions {
+ /**
+ * This option enables tracing of API calls to Google services used within the Apache
+ * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...}
+ * </code> where the {@code ApiName} represents the request classes canonical name. The
+ * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported.
+ * Typically, "producer" is the right destination to use: this makes API traces available to the
+ * team offering the API. Note that by enabling this option, the contents of the requests to and
+ * from Google Cloud services will be made available to Google. For example, by specifying
+ * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available
+ * to Google, specifically to the Google Cloud Dataflow team.
+ */
+ @Description("This option enables tracing of API calls to Google services used within the Apache "
+ + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} "
+ + "where the ApiName represents the request classes canonical name. The TraceDestination is "
+ + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is "
+ + "the right destination to use: this makes API traces available to the team offering the "
+ + "API. Note that by enabling this option, the contents of the requests to and from "
+ + "Google Cloud services will be made available to Google. For example, by specifying "
+ + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to "
+ + "Google, specifically to the Google Cloud Dataflow team.")
+ GoogleApiTracer getGoogleApiTrace();
+ void setGoogleApiTrace(GoogleApiTracer commands);
+
+ /**
+ * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls.
+ */
+ class GoogleApiTracer extends HashMap<String, String>
+ implements GoogleClientRequestInitializer {
+ /**
+ * Creates a {@link GoogleApiTracer} that sets the trace destination on all
+ * calls that match the given client type.
+ */
+ public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) {
+ put(client.getClass().getCanonicalName(), traceDestination);
+ return this;
+ }
+
+ /**
+ * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all
+ * calls that match for the given request type.
+ */
+ public GoogleApiTracer addTraceFor(
+ AbstractGoogleClientRequest<?> request, String traceDestination) {
+ put(request.getClass().getCanonicalName(), traceDestination);
+ return this;
+ }
+
+ @Override
+ public void initialize(AbstractGoogleClientRequest<?> request) throws IOException {
+ for (Map.Entry<String, String> entry : this.entrySet()) {
+ if (request.getClass().getCanonicalName().contains(entry.getKey())) {
+ request.set("$trace", entry.getValue());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java
new file mode 100644
index 0000000..bc9646c
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines {@link org.apache.beam.sdk.options.PipelineOptions} for
+ * configuring pipeline execution for Google Cloud Platform components.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
new file mode 100644
index 0000000..3a12620
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * Implements IOChannelFactory for GCS.
+ */
+public class GcsIOChannelFactory implements IOChannelFactory {
+
+ /**
+ * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}.
+ */
+ public static GcsIOChannelFactory fromOptions(PipelineOptions options) {
+ return new GcsIOChannelFactory(options.as(GcsOptions.class));
+ }
+
+ private final GcsOptions options;
+
+ private GcsIOChannelFactory(GcsOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public Collection<String> match(String spec) throws IOException {
+ GcsPath path = GcsPath.fromUri(spec);
+ GcsUtil util = options.getGcsUtil();
+ List<GcsPath> matched = util.expand(path);
+
+ List<String> specs = new LinkedList<>();
+ for (GcsPath match : matched) {
+ specs.add(match.toString());
+ }
+
+ return specs;
+ }
+
+ @Override
+ public ReadableByteChannel open(String spec) throws IOException {
+ GcsPath path = GcsPath.fromUri(spec);
+ GcsUtil util = options.getGcsUtil();
+ return util.open(path);
+ }
+
+ @Override
+ public WritableByteChannel create(String spec, String mimeType)
+ throws IOException {
+ GcsPath path = GcsPath.fromUri(spec);
+ GcsUtil util = options.getGcsUtil();
+ return util.create(path, mimeType);
+ }
+
+ @Override
+ public long getSizeBytes(String spec) throws IOException {
+ GcsPath path = GcsPath.fromUri(spec);
+ GcsUtil util = options.getGcsUtil();
+ return util.fileSize(path);
+ }
+
+ @Override
+ public boolean isReadSeekEfficient(String spec) throws IOException {
+ // TODO It is incorrect to return true here for files with content encoding set to gzip.
+ return true;
+ }
+
+ @Override
+ public String resolve(String path, String other) throws IOException {
+ return toPath(path).resolve(other).toString();
+ }
+
+ @Override
+ public Path toPath(String path) {
+ return GcsPath.fromUri(path);
+ }
+
+ @Override
+ public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+ throws IOException {
+ options.getGcsUtil().copy(srcFilenames, destFilenames);
+ }
+
+ @Override
+ public void remove(Collection<String> filesOrDirs) throws IOException {
+ options.getGcsUtil().remove(filesOrDirs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
new file mode 100644
index 0000000..b4c457f
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link GcsIOChannelFactory}.
+ */
+@AutoService(IOChannelFactoryRegistrar.class)
+public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar {
+
+ @Override
+ public GcsIOChannelFactory fromOptions(PipelineOptions options) {
+ return GcsIOChannelFactory.fromOptions(options);
+ }
+
+ @Override
+ public String getScheme() {
+ return "gs";
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
new file mode 100644
index 0000000..4d58424
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+ private GcsOptions gcpOptions;
+
+ private GcsPathValidator(GcsOptions options) {
+ this.gcpOptions = options;
+ }
+
+ public static GcsPathValidator fromOptions(PipelineOptions options) {
+ return new GcsPathValidator(options.as(GcsOptions.class));
+ }
+
+ /**
+ * Validates the the input GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public String validateInputFilePatternSupported(String filepattern) {
+ GcsPath gcsPath = getGcsPath(filepattern);
+ checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+ String returnValue = verifyPath(filepattern);
+ verifyPathIsAccessible(filepattern, "Could not find file %s");
+ return returnValue;
+ }
+
+ /**
+ * Validates the the output GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public String validateOutputFilePrefixSupported(String filePrefix) {
+ String returnValue = verifyPath(filePrefix);
+ verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+ return returnValue;
+ }
+
+ @Override
+ public String verifyPath(String path) {
+ GcsPath gcsPath = getGcsPath(path);
+ checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+ checkArgument(!gcsPath.getObject().isEmpty(),
+ "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?",
+ gcsPath, gcsPath.getBucket());
+ checkArgument(!gcsPath.getObject().contains("//"),
+ "Dataflow Service does not allow objects with consecutive slashes");
+ return gcsPath.toResourceName();
+ }
+
+ private void verifyPathIsAccessible(String path, String errorMessage) {
+ GcsPath gcsPath = getGcsPath(path);
+ try {
+ checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
+ errorMessage, path);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+ e);
+ }
+ }
+
+ private GcsPath getGcsPath(String path) {
+ try {
+ return GcsPath.fromUri(path);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Expected a valid 'gs://' path but was given '%s'", path), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
new file mode 100644
index 0000000..c8e6839
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
+import com.google.api.client.googleapis.json.GoogleJsonError;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
+import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.ClientRequestHelper;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides operations on GCS.
+ */
+public class GcsUtil {
+ /**
+ * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using
+ * any transport flags specified on the {@link PipelineOptions}.
+ */
+ public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {
+ /**
+ * Returns an instance of {@link GcsUtil} based on the
+ * {@link PipelineOptions}.
+ *
+ * <p>If no instance has previously been created, one is created and the value
+ * stored in {@code options}.
+ */
+ @Override
+ public GcsUtil create(PipelineOptions options) {
+ LOG.debug("Creating new GcsUtil");
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
+ return new GcsUtil(
+ storageBuilder.build(),
+ storageBuilder.getHttpRequestInitializer(),
+ gcsOptions.getExecutorService(),
+ gcsOptions.getGcsUploadBufferSizeBytes());
+ }
+
+ /**
+ * Returns an instance of {@link GcsUtil} based on the given parameters.
+ */
+ public static GcsUtil create(
+ Storage storageClient,
+ HttpRequestInitializer httpRequestInitializer,
+ ExecutorService executorService,
+ @Nullable Integer uploadBufferSizeBytes) {
+ return new GcsUtil(
+ storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
+
+ /** Maximum number of items to retrieve per Objects.List request. */
+ private static final long MAX_LIST_ITEMS_PER_CALL = 1024;
+
+ /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */
+ private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
+
+ private static final String RECURSIVE_WILDCARD = "[*]{2}";
+
+ /**
+ * A {@link Pattern} for globs with a recursive wildcard.
+ */
+ private static final Pattern RECURSIVE_GCS_PATTERN =
+ Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");
+
+ /**
+ * Maximum number of requests permitted in a GCS batch request.
+ */
+ private static final int MAX_REQUESTS_PER_BATCH = 100;
+ /**
+ * Maximum number of concurrent batches of requests executing on GCS.
+ */
+ private static final int MAX_CONCURRENT_BATCHES = 256;
+
+ private static final FluentBackoff BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /** Client for the GCS API. */
+ private Storage storageClient;
+ private final HttpRequestInitializer httpRequestInitializer;
+ /** Buffer size for GCS uploads (in bytes). */
+ @Nullable private final Integer uploadBufferSizeBytes;
+
+ // Helper delegate for turning IOExceptions from API calls into higher-level semantics.
+ private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+
+ // Exposed for testing.
+ final ExecutorService executorService;
+
+ /**
+ * Returns true if the given GCS pattern is supported otherwise fails with an
+ * exception.
+ */
+ public static boolean isGcsPatternSupported(String gcsPattern) {
+ if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) {
+ throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": "
+ + " recursive wildcards are not supported.");
+ }
+ return true;
+ }
+
+ /**
+ * Returns the prefix portion of the glob that doesn't contain wildcards.
+ */
+ public static String getGlobPrefix(String globExp) {
+ checkArgument(isGcsPatternSupported(globExp));
+ Matcher m = GLOB_PREFIX.matcher(globExp);
+ checkArgument(
+ m.matches(),
+ String.format("Glob expression: [%s] is not expandable.", globExp));
+ return m.group("PREFIX");
+ }
+
+ /**
+ * Expands glob expressions to regular expressions.
+ *
+ * @param globExp the glob expression to expand
+ * @return a string with the regular expression this glob expands to
+ */
+ public static String globToRegexp(String globExp) {
+ StringBuilder dst = new StringBuilder();
+ char[] src = globExp.toCharArray();
+ int i = 0;
+ while (i < src.length) {
+ char c = src[i++];
+ switch (c) {
+ case '*':
+ dst.append("[^/]*");
+ break;
+ case '?':
+ dst.append("[^/]");
+ break;
+ case '.':
+ case '+':
+ case '{':
+ case '}':
+ case '(':
+ case ')':
+ case '|':
+ case '^':
+ case '$':
+ // These need to be escaped in regular expressions
+ dst.append('\\').append(c);
+ break;
+ case '\\':
+ i = doubleSlashes(dst, src, i);
+ break;
+ default:
+ dst.append(c);
+ break;
+ }
+ }
+ return dst.toString();
+ }
+
+ /**
+ * Returns true if the given {@code spec} contains glob.
+ */
+ public static boolean isGlob(GcsPath spec) {
+ return GLOB_PREFIX.matcher(spec.getObject()).matches();
+ }
+
+ private GcsUtil(
+ Storage storageClient,
+ HttpRequestInitializer httpRequestInitializer,
+ ExecutorService executorService,
+ @Nullable Integer uploadBufferSizeBytes) {
+ this.storageClient = storageClient;
+ this.httpRequestInitializer = httpRequestInitializer;
+ this.uploadBufferSizeBytes = uploadBufferSizeBytes;
+ this.executorService = executorService;
+ }
+
+ // Use this only for testing purposes.
+ protected void setStorageClient(Storage storageClient) {
+ this.storageClient = storageClient;
+ }
+
+ /**
+ * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded
+ * in the result. For patterns that only match a single object, we ensure that the object
+ * exists.
+ */
+ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
+ checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
+ Pattern p = null;
+ String prefix = null;
+ if (!isGlob(gcsPattern)) {
+ // Not a glob.
+ try {
+ // Use a get request to fetch the metadata of the object, and ignore the return value.
+ // The request has strong global consistency.
+ getObject(gcsPattern);
+ return ImmutableList.of(gcsPattern);
+ } catch (FileNotFoundException e) {
+ // If the path was not found, return an empty list.
+ return ImmutableList.of();
+ }
+ } else {
+ // Part before the first wildcard character.
+ prefix = getGlobPrefix(gcsPattern.getObject());
+ p = Pattern.compile(globToRegexp(gcsPattern.getObject()));
+ }
+
+ LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
+ prefix, p.toString());
+
+ String pageToken = null;
+ List<GcsPath> results = new LinkedList<>();
+ do {
+ Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
+ if (objects.getItems() == null) {
+ break;
+ }
+
+ // Filter objects based on the regex.
+ for (StorageObject o : objects.getItems()) {
+ String name = o.getName();
+ // Skip directories, which end with a slash.
+ if (p.matcher(name).matches() && !name.endsWith("/")) {
+ LOG.debug("Matched object: {}", name);
+ results.add(GcsPath.fromObject(o));
+ }
+ }
+ pageToken = objects.getNextPageToken();
+ } while (pageToken != null);
+
+ return results;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Integer getUploadBufferSizeBytes() {
+ return uploadBufferSizeBytes;
+ }
+
+ /**
+ * Returns the file size from GCS or throws {@link FileNotFoundException}
+ * if the resource does not exist.
+ */
+ public long fileSize(GcsPath path) throws IOException {
+ return getObject(path).getSize().longValue();
+ }
+
+ /**
+ * Returns the {@link StorageObject} for the given {@link GcsPath}.
+ */
+ public StorageObject getObject(GcsPath gcsPath) throws IOException {
+ return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ }
+
+ @VisibleForTesting
+ StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException {
+ Storage.Objects.Get getObject =
+ storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
+ try {
+ return ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(getObject),
+ backoff,
+ RetryDeterminer.SOCKET_ERRORS,
+ IOException.class,
+ sleeper);
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
+ throw new FileNotFoundException(gcsPath.toString());
+ }
+ throw new IOException(
+ String.format("Unable to get the file object for path %s.", gcsPath),
+ e);
+ }
+ }
+
+ /**
+ * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given
+ * {@link GcsPath GcsPaths}.
+ */
+ public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
+ throws IOException {
+ List<StorageObjectOrIOException[]> results = new ArrayList<>();
+ executeBatches(makeGetBatches(gcsPaths, results));
+ ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder();
+ for (StorageObjectOrIOException[] result : results) {
+ ret.add(result[0]);
+ }
+ return ret.build();
+ }
+
+ /**
+ * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}.
+ */
+ public Objects listObjects(String bucket, String prefix, @Nullable String pageToken)
+ throws IOException {
+ // List all objects that start with the prefix (including objects in sub-directories).
+ Storage.Objects.List listObject = storageClient.objects().list(bucket);
+ listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
+ listObject.setPrefix(prefix);
+
+ if (pageToken != null) {
+ listObject.setPageToken(pageToken);
+ }
+
+ try {
+ return ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(listObject),
+ BACKOFF_FACTORY.backoff(),
+ RetryDeterminer.SOCKET_ERRORS,
+ IOException.class);
+ } catch (Exception e) {
+ throw new IOException(
+ String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix),
+ e);
+ }
+ }
+
+ /**
+ * Returns the file size from GCS or throws {@link FileNotFoundException}
+ * if the resource does not exist.
+ */
+ @VisibleForTesting
+ List<Long> fileSizes(List<GcsPath> paths) throws IOException {
+ List<StorageObjectOrIOException> results = getObjects(paths);
+
+ ImmutableList.Builder<Long> ret = ImmutableList.builder();
+ for (StorageObjectOrIOException result : results) {
+ ret.add(toFileSize(result));
+ }
+ return ret.build();
+ }
+
+ private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException)
+ throws IOException {
+ if (storageObjectOrIOException.ioException() != null) {
+ throw storageObjectOrIOException.ioException();
+ } else {
+ return storageObjectOrIOException.storageObject().getSize().longValue();
+ }
+ }
+
+ /**
+ * Opens an object in GCS.
+ *
+ * <p>Returns a SeekableByteChannel that provides access to data in the bucket.
+ *
+ * @param path the GCS filename to read from
+ * @return a SeekableByteChannel that can read the object data
+ */
+ public SeekableByteChannel open(GcsPath path)
+ throws IOException {
+ return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(),
+ path.getObject(), errorExtractor,
+ new ClientRequestHelper<StorageObject>());
+ }
+
+ /**
+ * Creates an object in GCS.
+ *
+ * <p>Returns a WritableByteChannel that can be used to write data to the
+ * object.
+ *
+ * @param path the GCS file to write to
+ * @param type the type of object, eg "text/plain".
+ * @return a Callable object that encloses the operation.
+ */
+ public WritableByteChannel create(GcsPath path,
+ String type) throws IOException {
+ GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel(
+ executorService,
+ storageClient,
+ new ClientRequestHelper<StorageObject>(),
+ path.getBucket(),
+ path.getObject(),
+ AsyncWriteChannelOptions.newBuilder().build(),
+ new ObjectWriteConditions(),
+ Collections.<String, String>emptyMap(),
+ type);
+ if (uploadBufferSizeBytes != null) {
+ channel.setUploadBufferSize(uploadBufferSizeBytes);
+ }
+ channel.initialize();
+ return channel;
+ }
+
+ /**
+ * Returns whether the GCS bucket exists and is accessible.
+ */
+ public boolean bucketAccessible(GcsPath path) throws IOException {
+ return bucketAccessible(
+ path,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
+ }
+
+ /**
+ * Returns the project number of the project which owns this bucket.
+ * If the bucket exists, it must be accessible otherwise the permissions
+ * exception will be propagated. If the bucket does not exist, an exception
+ * will be thrown.
+ */
+ public long bucketOwner(GcsPath path) throws IOException {
+ return getBucket(
+ path,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT).getProjectNumber().longValue();
+ }
+
+ /**
+ * Creates a {@link Bucket} under the specified project in Cloud Storage or
+ * propagates an exception.
+ */
+ public void createBucket(String projectId, Bucket bucket) throws IOException {
+ createBucket(
+ projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ }
+
+ /**
+ * Returns whether the GCS bucket exists. This will return false if the bucket
+ * is inaccessible due to permissions.
+ */
+ @VisibleForTesting
+ boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+ try {
+ return getBucket(path, backoff, sleeper) != null;
+ } catch (AccessDeniedException | FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+ Storage.Buckets.Get getBucket =
+ storageClient.buckets().get(path.getBucket());
+
+ try {
+ Bucket bucket = ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(getBucket),
+ backoff,
+ new RetryDeterminer<IOException>() {
+ @Override
+ public boolean shouldRetry(IOException e) {
+ if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+ return false;
+ }
+ return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+ }
+ },
+ IOException.class,
+ sleeper);
+
+ return bucket;
+ } catch (GoogleJsonResponseException e) {
+ if (errorExtractor.accessDenied(e)) {
+ throw new AccessDeniedException(path.toString(), null, e.getMessage());
+ }
+ if (errorExtractor.itemNotFound(e)) {
+ throw new FileNotFoundException(e.getMessage());
+ }
+ throw e;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ String.format("Error while attempting to verify existence of bucket gs://%s",
+ path.getBucket()), e);
+ }
+ }
+
+ @VisibleForTesting
+ void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
+ throws IOException {
+ Storage.Buckets.Insert insertBucket =
+ storageClient.buckets().insert(projectId, bucket);
+ insertBucket.setPredefinedAcl("projectPrivate");
+ insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
+
+ try {
+ ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(insertBucket),
+ backoff,
+ new RetryDeterminer<IOException>() {
+ @Override
+ public boolean shouldRetry(IOException e) {
+ if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) {
+ return false;
+ }
+ return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+ }
+ },
+ IOException.class,
+ sleeper);
+ return;
+ } catch (GoogleJsonResponseException e) {
+ if (errorExtractor.accessDenied(e)) {
+ throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
+ }
+ if (errorExtractor.itemAlreadyExists(e)) {
+ throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage());
+ }
+ throw e;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ String.format("Error while attempting to create bucket gs://%s for rproject %s",
+ bucket.getName(), projectId), e);
+ }
+ }
+
+ private static void executeBatches(List<BatchRequest> batches) throws IOException {
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+ MoreExecutors.getExitingExecutorService(
+ new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>())));
+
+ List<ListenableFuture<Void>> futures = new LinkedList<>();
+ for (final BatchRequest batch : batches) {
+ futures.add(executor.submit(new Callable<Void>() {
+ public Void call() throws IOException {
+ batch.execute();
+ return null;
+ }
+ }));
+ }
+
+ try {
+ Futures.allAsList(futures).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while executing batch GCS request", e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof FileNotFoundException) {
+ throw (FileNotFoundException) e.getCause();
+ }
+ throw new IOException("Error executing batch GCS request", e);
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ /**
+ * Makes get {@link BatchRequest BatchRequests}.
+ *
+ * @param paths {@link GcsPath GcsPaths}.
+ * @param results mutable {@link List} for return values.
+ * @return {@link BatchRequest BatchRequests} to execute.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ List<BatchRequest> makeGetBatches(
+ Collection<GcsPath> paths,
+ List<StorageObjectOrIOException[]> results) throws IOException {
+ List<BatchRequest> batches = new LinkedList<>();
+ for (List<GcsPath> filesToGet :
+ Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+ BatchRequest batch = createBatchRequest();
+ for (GcsPath path : filesToGet) {
+ results.add(enqueueGetFileSize(path, batch));
+ }
+ batches.add(batch);
+ }
+ return batches;
+ }
+
+ public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+ throws IOException {
+ executeBatches(makeCopyBatches(srcFilenames, destFilenames));
+ }
+
+ List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+ throws IOException {
+ List<String> srcList = Lists.newArrayList(srcFilenames);
+ List<String> destList = Lists.newArrayList(destFilenames);
+ checkArgument(
+ srcList.size() == destList.size(),
+ "Number of source files %s must equal number of destination files %s",
+ srcList.size(),
+ destList.size());
+
+ List<BatchRequest> batches = new LinkedList<>();
+ BatchRequest batch = createBatchRequest();
+ for (int i = 0; i < srcList.size(); i++) {
+ final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
+ final GcsPath destPath = GcsPath.fromUri(destList.get(i));
+ enqueueCopy(sourcePath, destPath, batch);
+ if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
+ batches.add(batch);
+ batch = createBatchRequest();
+ }
+ }
+ if (batch.size() > 0) {
+ batches.add(batch);
+ }
+ return batches;
+ }
+
+ List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException {
+ List<BatchRequest> batches = new LinkedList<>();
+ for (List<String> filesToDelete :
+ Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) {
+ BatchRequest batch = createBatchRequest();
+ for (String file : filesToDelete) {
+ enqueueDelete(GcsPath.fromUri(file), batch);
+ }
+ batches.add(batch);
+ }
+ return batches;
+ }
+
+ public void remove(Collection<String> filenames) throws IOException {
+ executeBatches(makeRemoveBatches(filenames));
+ }
+
+ private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch)
+ throws IOException {
+ final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1];
+
+ Storage.Objects.Get getRequest = storageClient.objects()
+ .get(path.getBucket(), path.getObject());
+ getRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
+ @Override
+ public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException {
+ ret[0] = StorageObjectOrIOException.create(response);
+ }
+
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException {
+ IOException ioException;
+ if (errorExtractor.itemNotFound(e)) {
+ ioException = new FileNotFoundException(path.toString());
+ } else {
+ ioException = new IOException(String.format("Error trying to get %s: %s", path, e));
+ }
+ ret[0] = StorageObjectOrIOException.create(ioException);
+ }
+ });
+ return ret;
+ }
+
+ /**
+ * A class that holds either a {@link StorageObject} or an {@link IOException}.
+ */
+ @AutoValue
+ public abstract static class StorageObjectOrIOException {
+
+ /**
+ * Returns the {@link StorageObject}.
+ */
+ @Nullable
+ public abstract StorageObject storageObject();
+
+ /**
+ * Returns the {@link IOException}.
+ */
+ @Nullable
+ public abstract IOException ioException();
+
+ @VisibleForTesting
+ public static StorageObjectOrIOException create(StorageObject storageObject) {
+ return new AutoValue_GcsUtil_StorageObjectOrIOException(
+ checkNotNull(storageObject, "storageObject"),
+ null /* ioException */);
+ }
+
+ @VisibleForTesting
+ public static StorageObjectOrIOException create(IOException ioException) {
+ return new AutoValue_GcsUtil_StorageObjectOrIOException(
+ null /* storageObject */,
+ checkNotNull(ioException, "ioException"));
+ }
+ }
+
+ private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch)
+ throws IOException {
+ Storage.Objects.Copy copyRequest = storageClient.objects()
+ .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null);
+ copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
+ @Override
+ public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
+ LOG.debug("Successfully copied {} to {}", from, to);
+ }
+
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+ throw new IOException(
+ String.format("Error trying to copy %s to %s: %s", from, to, e));
+ }
+ });
+ }
+
+ private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException {
+ Storage.Objects.Delete deleteRequest = storageClient.objects()
+ .delete(file.getBucket(), file.getObject());
+ deleteRequest.queue(batch, new JsonBatchCallback<Void>() {
+ @Override
+ public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+ LOG.debug("Successfully deleted {}", file);
+ }
+
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+ throw new IOException(String.format("Error trying to delete %s: %s", file, e));
+ }
+ });
+ }
+
+ private BatchRequest createBatchRequest() {
+ return storageClient.batch(httpRequestInitializer);
+ }
+
+ private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+ // Emit the next character without special interpretation
+ dst.append('\\');
+ if ((i - 1) != src.length) {
+ dst.append(src[i]);
+ i++;
+ } else {
+ // A backslash at the very end is treated like an escaped backslash
+ dst.append('\\');
+ }
+ return i;
+ }
+}