You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/02 17:57:07 UTC
[2/7] beam git commit: [BEAM-2135] Move gcp-core to
google-cloud-platform-core
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
new file mode 100644
index 0000000..2b7135e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseInterceptor;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a request initializer that adds retry handlers to all
+ * HttpRequests.
+ *
+ * <p>Also can take a HttpResponseInterceptor to be applied to the responses.
+ */
+public class RetryHttpRequestInitializer implements HttpRequestInitializer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RetryHttpRequestInitializer.class);
+
+ /**
+ * Http response codes that should be silently ignored.
+ */
+ private static final Set<Integer> DEFAULT_IGNORED_RESPONSE_CODES = new HashSet<>(
+ Arrays.asList(307 /* Redirect, handled by the client library */,
+ 308 /* Resume Incomplete, handled by the client library */));
+
+ /**
+ * Http response timeout to use for hanging gets.
+ */
+ private static final int HANGING_GET_TIMEOUT_SEC = 80;
+
+ private static class LoggingHttpBackOffIOExceptionHandler
+ extends HttpBackOffIOExceptionHandler {
+ public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) {
+ super(backOff);
+ }
+
+ @Override
+ public boolean handleIOException(HttpRequest request, boolean supportsRetry)
+ throws IOException {
+ boolean willRetry = super.handleIOException(request, supportsRetry);
+ if (willRetry) {
+ LOG.debug("Request failed with IOException, will retry: {}", request.getUrl());
+ } else {
+ LOG.warn("Request failed with IOException, will NOT retry: {}", request.getUrl());
+ }
+ return willRetry;
+ }
+ }
+
+ private static class LoggingHttpBackoffUnsuccessfulResponseHandler
+ implements HttpUnsuccessfulResponseHandler {
+ private final HttpBackOffUnsuccessfulResponseHandler handler;
+ private final Set<Integer> ignoredResponseCodes;
+
+ public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff,
+ Sleeper sleeper, Set<Integer> ignoredResponseCodes) {
+ this.ignoredResponseCodes = ignoredResponseCodes;
+ handler = new HttpBackOffUnsuccessfulResponseHandler(backoff);
+ handler.setSleeper(sleeper);
+ handler.setBackOffRequired(
+ new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() {
+ @Override
+ public boolean isRequired(HttpResponse response) {
+ int statusCode = response.getStatusCode();
+ return (statusCode / 100 == 5) || // 5xx: server error
+ statusCode == 429; // 429: Too many requests
+ }
+ });
+ }
+
+ @Override
+ public boolean handleResponse(HttpRequest request, HttpResponse response,
+ boolean supportsRetry) throws IOException {
+ boolean retry = handler.handleResponse(request, response, supportsRetry);
+ if (retry) {
+ LOG.debug("Request failed with code {} will retry: {}",
+ response.getStatusCode(), request.getUrl());
+
+ } else if (!ignoredResponseCodes.contains(response.getStatusCode())) {
+ LOG.warn("Request failed with code {}, will NOT retry: {}",
+ response.getStatusCode(), request.getUrl());
+ }
+
+ return retry;
+ }
+ }
+
+ private final HttpResponseInterceptor responseInterceptor; // response Interceptor to use
+
+ private final NanoClock nanoClock; // used for testing
+
+ private final Sleeper sleeper; // used for testing
+
+ private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES);
+
+ public RetryHttpRequestInitializer() {
+ this(Collections.<Integer>emptyList());
+ }
+
+ /**
+ * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
+ */
+ public RetryHttpRequestInitializer(Collection<Integer> additionalIgnoredResponseCodes) {
+ this(additionalIgnoredResponseCodes, null);
+ }
+
+ /**
+ * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
+ * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null.
+ */
+ public RetryHttpRequestInitializer(
+ Collection<Integer> additionalIgnoredResponseCodes,
+ @Nullable HttpResponseInterceptor responseInterceptor) {
+ this(NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes,
+ responseInterceptor);
+ }
+
+ /**
+ * Visible for testing.
+ *
+ * @param nanoClock used as a timing source for knowing how much time has elapsed.
+ * @param sleeper used to sleep between retries.
+ * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
+ */
+ RetryHttpRequestInitializer(
+ NanoClock nanoClock, Sleeper sleeper, Collection<Integer> additionalIgnoredResponseCodes,
+ HttpResponseInterceptor responseInterceptor) {
+ this.nanoClock = nanoClock;
+ this.sleeper = sleeper;
+ this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
+ this.responseInterceptor = responseInterceptor;
+ }
+
+ @Override
+ public void initialize(HttpRequest request) throws IOException {
+ // Set a timeout for hanging-gets.
+ // TODO: Do this exclusively for work requests.
+ request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
+
+ // Back off on retryable http errors.
+ request.setUnsuccessfulResponseHandler(
+ // A back-off multiplier of 2 raises the maximum request retrying time
+ // to approximately 5 minutes (keeping other back-off parameters to
+ // their default values).
+ new LoggingHttpBackoffUnsuccessfulResponseHandler(
+ new ExponentialBackOff.Builder().setNanoClock(nanoClock)
+ .setMultiplier(2).build(),
+ sleeper, ignoredResponseCodes));
+
+ // Retry immediately on IOExceptions.
+ LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler =
+ new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF);
+ request.setIOExceptionHandler(loggingBackoffHandler);
+
+ // Set response initializer
+ if (responseInterceptor != null) {
+ request.setResponseInterceptor(responseInterceptor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
new file mode 100644
index 0000000..b8474bb
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.storage.Storage;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+
+/**
+ * Helpers for cloud communication.
+ */
+public class Transport {
+
+ private static class SingletonHelper {
+ /** Global instance of the JSON factory. */
+ private static final JsonFactory JSON_FACTORY;
+
+ /** Global instance of the HTTP transport. */
+ private static final HttpTransport HTTP_TRANSPORT;
+
+ static {
+ try {
+ JSON_FACTORY = JacksonFactory.getDefaultInstance();
+ HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
+ } catch (GeneralSecurityException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static HttpTransport getTransport() {
+ return SingletonHelper.HTTP_TRANSPORT;
+ }
+
+ public static JsonFactory getJsonFactory() {
+ return SingletonHelper.JSON_FACTORY;
+ }
+
+ private static class ApiComponents {
+ public String rootUrl;
+ public String servicePath;
+
+ public ApiComponents(String root, String path) {
+ this.rootUrl = root;
+ this.servicePath = path;
+ }
+ }
+
+ private static ApiComponents apiComponentsFromUrl(String urlString) {
+ try {
+ URL url = new URL(urlString);
+ String rootUrl = url.getProtocol() + "://" + url.getHost()
+ + (url.getPort() > 0 ? ":" + url.getPort() : "");
+ return new ApiComponents(rootUrl, url.getPath());
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Invalid URL: " + urlString);
+ }
+ }
+
+ /**
+ * Returns a Cloud Storage client builder using the specified {@link GcsOptions}.
+ */
+ public static Storage.Builder
+ newStorageClient(GcsOptions options) {
+ String servicePath = options.getGcsEndpoint();
+ Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log the code 404. Code up the stack will deal with 404's if needed, and
+ // logging it by default clutters the output during file staging.
+ new RetryHttpRequestInitializer(
+ ImmutableList.of(404), new UploadIdResponseInterceptor())))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ if (servicePath != null) {
+ ApiComponents components = apiComponentsFromUrl(servicePath);
+ storageBuilder.setRootUrl(components.rootUrl);
+ storageBuilder.setServicePath(components.servicePath);
+ }
+ return storageBuilder;
+ }
+
+ private static HttpRequestInitializer chainHttpRequestInitializer(
+ Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ if (credential == null) {
+ return new ChainingHttpRequestInitializer(
+ new NullCredentialInitializer(), httpRequestInitializer);
+ } else {
+ return new ChainingHttpRequestInitializer(
+ new HttpCredentialsAdapter(credential),
+ httpRequestInitializer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
new file mode 100644
index 0000000..6a71bdc
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.gcsfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.google.api.services.storage.model.StorageObject;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Implements the Java NIO {@link Path} API for Google Cloud Storage paths.
+ *
+ * <p>GcsPath uses a slash ('/') as a directory separator. Below is
+ * a summary of how slashes are treated:
+ * <ul>
+ * <li> A GCS bucket may not contain a slash. An object may contain zero or
+ * more slashes.
+ * <li> A trailing slash always indicates a directory, which is compliant
+ * with POSIX.1-2008.
+ * <li> Slashes separate components of a path. Empty components are allowed,
+ * these are represented as repeated slashes. An empty component always
+ * refers to a directory, and always ends in a slash.
+ * <li> {@link #getParent()}} always returns a path ending in a slash, as the
+ * parent of a GcsPath is always a directory.
+ * <li> Use {@link #resolve(String)} to append elements to a GcsPath -- this
+ * applies the rules consistently and is highly recommended over any
+ * custom string concatenation.
+ * </ul>
+ *
+ * <p>GcsPath treats all GCS objects and buckets as belonging to the same
+ * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="".
+ *
+ * <p>Relative paths are not associated with any bucket. This matches common
+ * treatment of Path in which relative paths can be constructed from one
+ * filesystem and appended to another filesystem.
+ *
+ * @see <a href=
+ * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
+ * >Java Tutorials: Path Operations</a>
+ */
+public class GcsPath implements Path, Serializable {
+
+ public static final String SCHEME = "gs";
+
+ /**
+ * Creates a GcsPath from a URI.
+ *
+ * <p>The URI must be in the form {@code gs://[bucket]/[path]}, and may not
+ * contain a port, user info, a query, or a fragment.
+ */
+ public static GcsPath fromUri(URI uri) {
+ checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME), "URI: %s is not a GCS URI", uri);
+ checkArgument(uri.getPort() == -1,
+ "GCS URI may not specify port: %s (%i)", uri, uri.getPort());
+ checkArgument(
+ isNullOrEmpty(uri.getUserInfo()),
+ "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo());
+ checkArgument(
+ isNullOrEmpty(uri.getQuery()),
+ "GCS URI may not specify query: %s (%s)", uri, uri.getQuery());
+ checkArgument(
+ isNullOrEmpty(uri.getFragment()),
+ "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment());
+
+ return fromUri(uri.toString());
+ }
+
+ /**
+ * Pattern that is used to parse a GCS URL.
+ *
+ * <p>This is used to separate the components. Verification is handled
+ * separately.
+ */
+ public static final Pattern GCS_URI =
+ Pattern.compile("(?<SCHEME>[^:]+)://(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
+
+ /**
+ * Creates a GcsPath from a URI in string form.
+ *
+ * <p>This does not use URI parsing, which means it may accept patterns that
+ * the URI parser would not accept.
+ */
+ public static GcsPath fromUri(String uri) {
+ Matcher m = GCS_URI.matcher(uri);
+ checkArgument(m.matches(), "Invalid GCS URI: %s", uri);
+
+ checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME),
+ "URI: %s is not a GCS URI", uri);
+ return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
+ }
+
+ /**
+ * Pattern that is used to parse a GCS resource name.
+ */
+ private static final Pattern GCS_RESOURCE_NAME =
+ Pattern.compile("storage.googleapis.com/(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
+
+ /**
+ * Creates a GcsPath from a OnePlatform resource name in string form.
+ */
+ public static GcsPath fromResourceName(String name) {
+ Matcher m = GCS_RESOURCE_NAME.matcher(name);
+ checkArgument(m.matches(), "Invalid GCS resource name: %s", name);
+
+ return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
+ }
+
+ /**
+ * Creates a GcsPath from a {@linkplain StorageObject}.
+ */
+ public static GcsPath fromObject(StorageObject object) {
+ return new GcsPath(null, object.getBucket(), object.getName());
+ }
+
+ /**
+ * Creates a GcsPath from bucket and object components.
+ *
+ * <p>A GcsPath without a bucket name is treated as a relative path, which
+ * is a path component with no linkage to the root element. This is similar
+ * to a Unix path that does not begin with the root marker (a slash).
+ * GCS has different naming constraints and APIs for working with buckets and
+ * objects, so these two concepts are kept separate to avoid accidental
+ * attempts to treat objects as buckets, or vice versa, as much as possible.
+ *
+ * <p>A GcsPath without an object name is a bucket reference.
+ * A bucket is always a directory, which could be used to lookup or add
+ * files to a bucket, but could not be opened as a file.
+ *
+ * <p>A GcsPath containing neither bucket or object names is treated as
+ * the root of the GCS filesystem. A listing on the root element would return
+ * the buckets available to the user.
+ *
+ * <p>If {@code null} is passed as either parameter, it is converted to an
+ * empty string internally for consistency. There is no distinction between
+ * an empty string and a {@code null}, as neither are allowed by GCS.
+ *
+ * @param bucket a GCS bucket name, or none ({@code null} or an empty string)
+ * if the object is not associated with a bucket
+ * (e.g. relative paths or the root node).
+ * @param object a GCS object path, or none ({@code null} or an empty string)
+ * for no object.
+ */
+ public static GcsPath fromComponents(@Nullable String bucket,
+ @Nullable String object) {
+ return new GcsPath(null, bucket, object);
+ }
+
+ @Nullable
+ private transient FileSystem fs;
+ @Nonnull
+ private final String bucket;
+ @Nonnull
+ private final String object;
+
+ /**
+ * Constructs a GcsPath.
+ *
+ * @param fs the associated FileSystem, if any
+ * @param bucket the associated bucket, or none ({@code null} or an empty
+ * string) for a relative path component
+ * @param object the object, which is a fully-qualified object name if bucket
+ * was also provided, or none ({@code null} or an empty string)
+ * for no object
+ * @throws java.lang.IllegalArgumentException if the bucket of object names
+ * are invalid.
+ */
+ public GcsPath(@Nullable FileSystem fs,
+ @Nullable String bucket,
+ @Nullable String object) {
+ if (bucket == null) {
+ bucket = "";
+ }
+ checkArgument(!bucket.contains("/"),
+ "GCS bucket may not contain a slash");
+ checkArgument(bucket.isEmpty()
+ || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"),
+ "GCS bucket names must contain only lowercase letters, numbers, "
+ + "dashes (-), underscores (_), and dots (.). Bucket names "
+ + "must start and end with a number or letter. "
+ + "See https://developers.google.com/storage/docs/bucketnaming "
+ + "for more details. Bucket name: " + bucket);
+
+ if (object == null) {
+ object = "";
+ }
+ checkArgument(
+ object.indexOf('\n') < 0 && object.indexOf('\r') < 0,
+ "GCS object names must not contain Carriage Return or "
+ + "Line Feed characters.");
+
+ this.fs = fs;
+ this.bucket = bucket;
+ this.object = object;
+ }
+
+ /**
+ * Returns the bucket name associated with this GCS path, or an empty string
+ * if this is a relative path component.
+ */
+ public String getBucket() {
+ return bucket;
+ }
+
+ /**
+ * Returns the object name associated with this GCS path, or an empty string
+ * if no object is specified.
+ */
+ public String getObject() {
+ return object;
+ }
+
+ public void setFileSystem(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
+ // Absolute paths are those that have a bucket and the root path.
+ @Override
+ public boolean isAbsolute() {
+ return !bucket.isEmpty() || object.isEmpty();
+ }
+
+ @Override
+ public GcsPath getRoot() {
+ return new GcsPath(fs, "", "");
+ }
+
+ @Override
+ public GcsPath getFileName() {
+ int nameCount = getNameCount();
+ if (nameCount < 2) {
+ throw new UnsupportedOperationException(
+ "Can't get filename from root path in the bucket: " + this);
+ }
+ return getName(nameCount - 1);
+ }
+
+ /**
+ * Returns the <em>parent path</em>, or {@code null} if this path does not
+ * have a parent.
+ *
+ * <p>Returns a path that ends in '/', as the parent path always refers to
+ * a directory.
+ */
+ @Override
+ public GcsPath getParent() {
+ if (bucket.isEmpty() && object.isEmpty()) {
+ // The root path has no parent, by definition.
+ return null;
+ }
+
+ if (object.isEmpty()) {
+ // A GCS bucket. All buckets come from a common root.
+ return getRoot();
+ }
+
+ // Skip last character, in case it is a trailing slash.
+ int i = object.lastIndexOf('/', object.length() - 2);
+ if (i <= 0) {
+ if (bucket.isEmpty()) {
+ // Relative paths are not attached to the root node.
+ return null;
+ }
+ return new GcsPath(fs, bucket, "");
+ }
+
+ // Retain trailing slash.
+ return new GcsPath(fs, bucket, object.substring(0, i + 1));
+ }
+
+ @Override
+ public int getNameCount() {
+ int count = bucket.isEmpty() ? 0 : 1;
+ if (object.isEmpty()) {
+ return count;
+ }
+
+ // Add another for each separator found.
+ int index = -1;
+ while ((index = object.indexOf('/', index + 1)) != -1) {
+ count++;
+ }
+
+ return object.endsWith("/") ? count : count + 1;
+ }
+
+ @Override
+ public GcsPath getName(int count) {
+ checkArgument(count >= 0);
+
+ Iterator<Path> iterator = iterator();
+ for (int i = 0; i < count; ++i) {
+ checkArgument(iterator.hasNext());
+ iterator.next();
+ }
+
+ checkArgument(iterator.hasNext());
+ return (GcsPath) iterator.next();
+ }
+
+ @Override
+ public GcsPath subpath(int beginIndex, int endIndex) {
+ checkArgument(beginIndex >= 0);
+ checkArgument(endIndex > beginIndex);
+
+ Iterator<Path> iterator = iterator();
+ for (int i = 0; i < beginIndex; ++i) {
+ checkArgument(iterator.hasNext());
+ iterator.next();
+ }
+
+ GcsPath path = null;
+ while (beginIndex < endIndex) {
+ checkArgument(iterator.hasNext());
+ if (path == null) {
+ path = (GcsPath) iterator.next();
+ } else {
+ path = path.resolve(iterator.next());
+ }
+ ++beginIndex;
+ }
+
+ return path;
+ }
+
+ @Override
+ public boolean startsWith(Path other) {
+ if (other instanceof GcsPath) {
+ GcsPath gcsPath = (GcsPath) other;
+ return startsWith(gcsPath.bucketAndObject());
+ } else {
+ return startsWith(other.toString());
+ }
+ }
+
+ @Override
+ public boolean startsWith(String prefix) {
+ return bucketAndObject().startsWith(prefix);
+ }
+
+ @Override
+ public boolean endsWith(Path other) {
+ if (other instanceof GcsPath) {
+ GcsPath gcsPath = (GcsPath) other;
+ return endsWith(gcsPath.bucketAndObject());
+ } else {
+ return endsWith(other.toString());
+ }
+ }
+
+ @Override
+ public boolean endsWith(String suffix) {
+ return bucketAndObject().endsWith(suffix);
+ }
+
+ // TODO: support "." and ".." path components?
+ @Override
+ public GcsPath normalize() {
+ return this;
+ }
+
+ @Override
+ public GcsPath resolve(Path other) {
+ if (other instanceof GcsPath) {
+ GcsPath path = (GcsPath) other;
+ if (path.isAbsolute()) {
+ return path;
+ } else {
+ return resolve(path.getObject());
+ }
+ } else {
+ return resolve(other.toString());
+ }
+ }
+
+ @Override
+ public GcsPath resolve(String other) {
+ if (bucket.isEmpty() && object.isEmpty()) {
+ // Resolve on a root path is equivalent to looking up a bucket and object.
+ other = SCHEME + "://" + other;
+ }
+
+ if (other.startsWith(SCHEME + "://")) {
+ GcsPath path = GcsPath.fromUri(other);
+ path.setFileSystem(getFileSystem());
+ return path;
+ }
+
+ if (other.isEmpty()) {
+ // An empty component MUST refer to a directory.
+ other = "/";
+ }
+
+ if (object.isEmpty()) {
+ return new GcsPath(fs, bucket, other);
+ } else if (object.endsWith("/")) {
+ return new GcsPath(fs, bucket, object + other);
+ } else {
+ return new GcsPath(fs, bucket, object + "/" + other);
+ }
+ }
+
+ @Override
+ public Path resolveSibling(Path other) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Path resolveSibling(String other) {
+ if (getNameCount() < 2) {
+ throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this);
+ }
+ GcsPath parent = getParent();
+ return (parent == null) ? fromUri(other) : parent.resolve(other);
+ }
+
+ @Override
+ public Path relativize(Path other) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public GcsPath toAbsolutePath() {
+ return this;
+ }
+
+ @Override
+ public GcsPath toRealPath(LinkOption... options) throws IOException {
+ return this;
+ }
+
+ @Override
+ public File toFile() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] events,
+ WatchEvent.Modifier... modifiers) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Path> iterator() {
+ return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
+ }
+
+ private static class NameIterator implements Iterator<Path> {
+ private final FileSystem fs;
+ private boolean fullPath;
+ private String name;
+
+ NameIterator(FileSystem fs, boolean fullPath, String name) {
+ this.fs = fs;
+ this.fullPath = fullPath;
+ this.name = name;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !isNullOrEmpty(name);
+ }
+
+ @Override
+ public GcsPath next() {
+ int i = name.indexOf('/');
+ String component;
+ if (i >= 0) {
+ component = name.substring(0, i);
+ name = name.substring(i + 1);
+ } else {
+ component = name;
+ name = null;
+ }
+ if (fullPath) {
+ fullPath = false;
+ return new GcsPath(fs, component, "");
+ } else {
+ // Relative paths have no bucket.
+ return new GcsPath(fs, "", component);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public int compareTo(Path other) {
+ if (!(other instanceof GcsPath)) {
+ throw new ClassCastException();
+ }
+
+ GcsPath path = (GcsPath) other;
+ int b = bucket.compareTo(path.bucket);
+ if (b != 0) {
+ return b;
+ }
+
+ // Compare a component at a time, so that the separator char doesn't
+ // get compared against component contents. Eg, "a/b" < "a-1/b".
+ Iterator<Path> left = iterator();
+ Iterator<Path> right = path.iterator();
+
+ while (left.hasNext() && right.hasNext()) {
+ String leftStr = left.next().toString();
+ String rightStr = right.next().toString();
+ int c = leftStr.compareTo(rightStr);
+ if (c != 0) {
+ return c;
+ }
+ }
+
+ if (!left.hasNext() && !right.hasNext()) {
+ return 0;
+ } else {
+ return left.hasNext() ? 1 : -1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GcsPath paths = (GcsPath) o;
+ return bucket.equals(paths.bucket) && object.equals(paths.object);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = bucket.hashCode();
+ result = 31 * result + object.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ if (!isAbsolute()) {
+ return object;
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append(SCHEME)
+ .append("://");
+ if (!bucket.isEmpty()) {
+ sb.append(bucket)
+ .append('/');
+ }
+ sb.append(object);
+ return sb.toString();
+ }
+
+ // TODO: Consider using resource names for all GCS paths used by the SDK.
+ public String toResourceName() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("storage.googleapis.com/");
+ if (!bucket.isEmpty()) {
+ sb.append(bucket).append('/');
+ }
+ sb.append(object);
+ return sb.toString();
+ }
+
+ @Override
+ public URI toUri() {
+ try {
+ return new URI(SCHEME, "//" + bucketAndObject(), null);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unable to create URI for GCS path " + this);
+ }
+ }
+
+ private String bucketAndObject() {
+ if (bucket.isEmpty()) {
+ return object;
+ } else {
+ return bucket + "/" + object;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
new file mode 100644
index 0000000..4d49f8c
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines utilities used to interact with Google Cloud Storage. */
+package org.apache.beam.sdk.util.gcsfs;
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java
new file mode 100644
index 0000000..f8135e7
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */
+package org.apache.beam.sdk.util;
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
new file mode 100644
index 0000000..a8772c3
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** API surface verification for Google Cloud Platform core components. */
+@RunWith(JUnit4.class)
+public class GcpCoreApiSurfaceTest {
+
+ @Test
+ public void testApiSurface() throws Exception {
+
+ @SuppressWarnings("unchecked")
+ final Set<String> allowed =
+ ImmutableSet.of(
+ "org.apache.beam",
+ "com.google.api.client",
+ "com.google.api.services.storage",
+ "com.google.auth",
+ "com.fasterxml.jackson.annotation",
+ "com.fasterxml.jackson.core",
+ "com.fasterxml.jackson.databind",
+ "org.apache.avro",
+ "org.hamcrest",
+ // via DataflowMatchers
+ "org.codehaus.jackson",
+ // via Avro
+ "org.joda.time",
+ "org.junit");
+
+ assertThat(
+ ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
new file mode 100644
index 0000000..6f0846e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Fake credential, for use in testing.
+ */
+public class TestCredential extends Credentials {
+ @Override
+ public String getAuthenticationType() {
+ return "Test";
+ }
+
+ @Override
+ public Map<String, List<String>> getRequestMetadata() throws IOException {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public boolean hasRequestMetadata() {
+ return false;
+ }
+
+ @Override
+ public boolean hasRequestMetadataOnly() {
+ return true;
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
new file mode 100644
index 0000000..68b3818
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.api.services.storage.model.Bucket;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.GcpTempLocationFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcpOptions}. */
+@RunWith(Enclosed.class)
+public class GcpOptionsTest {
+
+ /** Tests for the majority of methods. */
+ @RunWith(JUnit4.class)
+ public static class Common {
+ @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
+ Map<String, String> environment =
+ ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project",
+ runGetProjectTest(tmpFolder.newFile("properties"), environment));
+ }
+
+ @Test
+ public void testGetProjectFromAppDataEnv() throws Exception {
+ Map<String, String> environment =
+ ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
+ System.setProperty("os.name", "windows");
+ assertEquals("test-project",
+ runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"),
+ environment));
+ }
+
+ @Test
+ public void testGetProjectFromUserHomeEnvOld() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project",
+ runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
+ environment));
+ }
+
+ @Test
+ public void testGetProjectFromUserHomeEnv() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project", runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+ environment));
+ }
+
+ @Test
+ public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ makePropertiesFileWithProject(
+ new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), "old-project");
+ assertEquals("test-project", runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+ environment));
+ }
+
+ @Test
+ public void testUnableToGetDefaultProject() throws Exception {
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+ when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
+ assertNull(projectFactory.create(PipelineOptionsFactory.create()));
+ }
+
+ @Test
+ public void testEmptyGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setProject("");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("--project is a required option");
+ options.getGcpTempLocation();
+ }
+
+ @Test
+ public void testDefaultGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ String tempLocation = "gs://bucket";
+ options.setTempLocation(tempLocation);
+ options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
+ assertEquals(tempLocation, options.getGcpTempLocation());
+ }
+
+ @Test
+ public void testDefaultGcpTempLocationInvalid() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setTempLocation("file://");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path");
+ options.getGcpTempLocation();
+ }
+
+ @Test
+ public void testDefaultGcpTempLocationDoesNotExist() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ String tempLocation = "gs://does/not/exist";
+ options.setTempLocation(tempLocation);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path");
+ thrown.expectCause(
+ hasMessage(containsString("Output path does not exist or is not writeable")));
+
+ options.getGcpTempLocation();
+ }
+
+ private static void makePropertiesFileWithProject(File path, String projectId)
+ throws IOException {
+ String properties = String.format("[core]%n"
+ + "account = test-account@google.com%n"
+ + "project = %s%n"
+ + "%n"
+ + "[dataflow]%n"
+ + "magic = true%n", projectId);
+ Files.write(properties, path, StandardCharsets.UTF_8);
+ }
+
+ private static String runGetProjectTest(File path, Map<String, String> environment)
+ throws Exception {
+ makePropertiesFileWithProject(path, "test-project");
+ DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+ when(projectFactory.getEnvironment()).thenReturn(environment);
+ return projectFactory.create(PipelineOptionsFactory.create());
+ }
+ }
+
+ /** Tests related to determining the GCP temp location. */
+ @RunWith(JUnit4.class)
+ public static class GcpTempLocation {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ @Mock private GcsUtil mockGcsUtil;
+ @Mock private CloudResourceManager mockCrmClient;
+ @Mock private Projects mockProjects;
+ @Mock private Get mockGet;
+ private Project fakeProject;
+ private PipelineOptions options;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ options = PipelineOptionsFactory.create();
+ options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
+ options.as(GcpOptions.class).setProject("foo");
+ options.as(GcpOptions.class).setZone("us-north1-a");
+ when(mockCrmClient.projects()).thenReturn(mockProjects);
+ when(mockProjects.get(any(String.class))).thenReturn(mockGet);
+ fakeProject = new Project().setProjectNumber(1L);
+ }
+
+ @Test
+ public void testCreateBucket() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L);
+
+ String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ assertEquals("gs://dataflow-staging-us-north1-1", bucket);
+ }
+
+ @Test
+ public void testCreateBucketProjectLookupFails() throws Exception {
+ doThrow(new IOException("badness")).when(mockGet).execute();
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to verify project");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void testCreateBucketCreateBucketFails() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ doThrow(new IOException("badness")).when(
+ mockGcsUtil).createBucket(any(String.class), any(Bucket.class));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable create default bucket");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void testCannotGetBucketOwner() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ when(mockGcsUtil.bucketOwner(any(GcsPath.class)))
+ .thenThrow(new IOException("badness"));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to determine the owner");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void testProjectMismatch() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Bucket owner does not match the project");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void regionFromZone() throws Exception {
+ assertEquals("us-central1", GcpTempLocationFactory.getRegionFromZone("us-central1-a"));
+ assertEquals("asia-east", GcpTempLocationFactory.getRegionFromZone("asia-east-a"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
new file mode 100644
index 0000000..67d5880
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Delete;
+import com.google.api.services.storage.Storage;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GoogleApiDebugOptions.GoogleApiTracer;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.Transport;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GoogleApiDebugOptions}. */
+@RunWith(JUnit4.class)
+public class GoogleApiDebugOptionsTest {
+ private static final String STORAGE_GET_TRACE =
+ "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
+ private static final String STORAGE_GET_AND_LIST_TRACE =
+ "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\","
+ + "\"Objects.List\":\"ListTraceDestination\"}";
+ private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}";
+
+ @Test
+ public void testWhenTracingMatches() throws Exception {
+ String[] args = new String[] {STORAGE_GET_TRACE};
+ GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.Get request =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("GetTraceDestination", request.get("$trace"));
+ }
+
+ @Test
+ public void testWhenTracingDoesNotMatch() throws Exception {
+ String[] args = new String[] {STORAGE_GET_TRACE};
+ GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.List request =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertNull(request.get("$trace"));
+ }
+
+ @Test
+ public void testWithMultipleTraces() throws Exception {
+ String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE};
+ GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("GetTraceDestination", getRequest.get("$trace"));
+
+ Storage.Objects.List listRequest =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertEquals("ListTraceDestination", listRequest.get("$trace"));
+ }
+
+ @Test
+ public void testMatchingAllCalls() throws Exception {
+ String[] args = new String[] {STORAGE_TRACE};
+ GcsOptions options =
+ PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("TraceDestination", getRequest.get("$trace"));
+
+ Storage.Objects.List listRequest =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertEquals("TraceDestination", listRequest.get("$trace"));
+ }
+
+ @Test
+ public void testMatchingAgainstClient() throws Exception {
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+ Transport.newStorageClient(options).build(), "TraceDestination"));
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("TraceDestination", getRequest.get("$trace"));
+
+ Delete deleteRequest = GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient(
+ options.as(CloudResourceManagerOptions.class))
+ .build().projects().delete("testProjectId");
+ assertNull(deleteRequest.get("$trace"));
+ }
+
+ @Test
+ public void testMatchingAgainstRequestType() throws Exception {
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+ Transport.newStorageClient(options).build().objects()
+ .get("aProjectId", "aObjectId"), "TraceDestination"));
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("TraceDestination", getRequest.get("$trace"));
+
+ Storage.Objects.List listRequest =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertNull(listRequest.get("$trace"));
+ }
+
+ @Test
+ public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception {
+ String serializedValue = "{\"Api\":\"Token\"}";
+ ObjectMapper objectMapper = new ObjectMapper();
+ assertEquals(serializedValue,
+ objectMapper.writeValueAsString(
+ objectMapper.readValue(serializedValue, GoogleApiTracer.class)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
new file mode 100644
index 0000000..a29dd45
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsIOChannelFactoryRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryRegistrarTest {
+
+ @Test
+ public void testServiceLoader() {
+ for (IOChannelFactoryRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+ if (registrar instanceof GcsIOChannelFactoryRegistrar) {
+ return;
+ }
+ }
+ fail("Expected to find " + GcsIOChannelFactoryRegistrar.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
new file mode 100644
index 0000000..f53490a
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GcsIOChannelFactoryTest}. */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryTest {
+ private GcsIOChannelFactory factory;
+
+ @Before
+ public void setUp() {
+ factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class));
+ }
+
+ @Test
+ public void testResolve() throws Exception {
+ assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..65fb228
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock private GcsUtil mockGcsUtil;
+ private GcsPathValidator validator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGcsUtil(mockGcsUtil);
+ validator = GcsPathValidator.fromOptions(options);
+ }
+
+ @Test
+ public void testValidFilePattern() {
+ validator.validateInputFilePatternSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidFilePattern() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateInputFilePatternSupported("/local/path");
+ }
+
+ @Test
+ public void testFilePatternMissingBucket() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Missing object or bucket in path: 'gs://input/', "
+ + "did you mean: 'gs://some-bucket/input'?");
+ validator.validateInputFilePatternSupported("gs://input");
+ }
+
+ @Test
+ public void testWhenBucketDoesNotExist() throws Exception {
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Could not find file gs://non-existent-bucket/location");
+ validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+ }
+
+ @Test
+ public void testValidOutputPrefix() {
+ validator.validateOutputFilePrefixSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidOutputPrefix() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateOutputFilePrefixSupported("/local/path");
+ }
+
+ @Test
+ public void testOutputPrefixMissingBucket() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Missing object or bucket in path: 'gs://output/', "
+ + "did you mean: 'gs://some-bucket/output'?");
+ validator.validateOutputFilePrefixSupported("gs://output");
+ }
+}