You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/02 17:57:07 UTC

[2/7] beam git commit: [BEAM-2135] Move gcp-core to google-cloud-platform-core

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
new file mode 100644
index 0000000..2b7135e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseInterceptor;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a request initializer that adds retry handlers to all
+ * HttpRequests.
+ *
+ * <p>Also can take a HttpResponseInterceptor to be applied to the responses.
+ */
+public class RetryHttpRequestInitializer implements HttpRequestInitializer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RetryHttpRequestInitializer.class);
+
+  /**
+   * Http response codes that should be silently ignored.
+   */
+  private static final Set<Integer> DEFAULT_IGNORED_RESPONSE_CODES = new HashSet<>(
+      Arrays.asList(307 /* Redirect, handled by the client library */,
+                    308 /* Resume Incomplete, handled by the client library */));
+
+  /**
+   * Http response timeout to use for hanging gets.
+   */
+  private static final int HANGING_GET_TIMEOUT_SEC = 80;
+
+  private static class LoggingHttpBackOffIOExceptionHandler
+      extends HttpBackOffIOExceptionHandler {
+    public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) {
+      super(backOff);
+    }
+
+    @Override
+    public boolean handleIOException(HttpRequest request, boolean supportsRetry)
+        throws IOException {
+      boolean willRetry = super.handleIOException(request, supportsRetry);
+      if (willRetry) {
+        LOG.debug("Request failed with IOException, will retry: {}", request.getUrl());
+      } else {
+        LOG.warn("Request failed with IOException, will NOT retry: {}", request.getUrl());
+      }
+      return willRetry;
+    }
+  }
+
+  private static class LoggingHttpBackoffUnsuccessfulResponseHandler
+      implements HttpUnsuccessfulResponseHandler {
+    private final HttpBackOffUnsuccessfulResponseHandler handler;
+    private final Set<Integer> ignoredResponseCodes;
+
+    public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff,
+        Sleeper sleeper, Set<Integer> ignoredResponseCodes) {
+      this.ignoredResponseCodes = ignoredResponseCodes;
+      handler = new HttpBackOffUnsuccessfulResponseHandler(backoff);
+      handler.setSleeper(sleeper);
+      handler.setBackOffRequired(
+          new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() {
+            @Override
+            public boolean isRequired(HttpResponse response) {
+              int statusCode = response.getStatusCode();
+              return (statusCode / 100 == 5) ||  // 5xx: server error
+                  statusCode == 429;             // 429: Too many requests
+            }
+          });
+    }
+
+    @Override
+    public boolean handleResponse(HttpRequest request, HttpResponse response,
+        boolean supportsRetry) throws IOException {
+      boolean retry = handler.handleResponse(request, response, supportsRetry);
+      if (retry) {
+        LOG.debug("Request failed with code {} will retry: {}",
+            response.getStatusCode(), request.getUrl());
+
+      } else if (!ignoredResponseCodes.contains(response.getStatusCode())) {
+        LOG.warn("Request failed with code {}, will NOT retry: {}",
+            response.getStatusCode(), request.getUrl());
+      }
+
+      return retry;
+    }
+  }
+
+  private final HttpResponseInterceptor responseInterceptor;  // response Interceptor to use
+
+  private final NanoClock nanoClock;  // used for testing
+
+  private final Sleeper sleeper;  // used for testing
+
+  private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES);
+
+  public RetryHttpRequestInitializer() {
+    this(Collections.<Integer>emptyList());
+  }
+
+  /**
+   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
+   */
+  public RetryHttpRequestInitializer(Collection<Integer> additionalIgnoredResponseCodes) {
+    this(additionalIgnoredResponseCodes, null);
+  }
+
+  /**
+   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
+   * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null.
+   */
+  public RetryHttpRequestInitializer(
+      Collection<Integer> additionalIgnoredResponseCodes,
+      @Nullable HttpResponseInterceptor responseInterceptor) {
+    this(NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes,
+        responseInterceptor);
+  }
+
+  /**
+   * Visible for testing.
+   *
+   * @param nanoClock used as a timing source for knowing how much time has elapsed.
+   * @param sleeper used to sleep between retries.
+   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
+   */
+  RetryHttpRequestInitializer(
+      NanoClock nanoClock, Sleeper sleeper, Collection<Integer> additionalIgnoredResponseCodes,
+      HttpResponseInterceptor responseInterceptor) {
+    this.nanoClock = nanoClock;
+    this.sleeper = sleeper;
+    this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
+    this.responseInterceptor = responseInterceptor;
+  }
+
+  @Override
+  public void initialize(HttpRequest request) throws IOException {
+    // Set a timeout for hanging-gets.
+    // TODO: Do this exclusively for work requests.
+    request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
+
+    // Back off on retryable http errors.
+    request.setUnsuccessfulResponseHandler(
+        // A back-off multiplier of 2 raises the maximum request retrying time
+        // to approximately 5 minutes (keeping other back-off parameters to
+        // their default values).
+        new LoggingHttpBackoffUnsuccessfulResponseHandler(
+            new ExponentialBackOff.Builder().setNanoClock(nanoClock)
+                                            .setMultiplier(2).build(),
+            sleeper, ignoredResponseCodes));
+
+    // Retry immediately on IOExceptions.
+    LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler =
+        new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF);
+    request.setIOExceptionHandler(loggingBackoffHandler);
+
+    // Set response initializer
+    if (responseInterceptor != null) {
+      request.setResponseInterceptor(responseInterceptor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
new file mode 100644
index 0000000..b8474bb
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.storage.Storage;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+
+/**
+ * Helpers for cloud communication.
+ */
+public class Transport {
+
+  private static class SingletonHelper {
+    /** Global instance of the JSON factory. */
+    private static final JsonFactory JSON_FACTORY;
+
+    /** Global instance of the HTTP transport. */
+    private static final HttpTransport HTTP_TRANSPORT;
+
+    static {
+      try {
+        JSON_FACTORY = JacksonFactory.getDefaultInstance();
+        HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
+      } catch (GeneralSecurityException | IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public static HttpTransport getTransport() {
+    return SingletonHelper.HTTP_TRANSPORT;
+  }
+
+  public static JsonFactory getJsonFactory() {
+    return SingletonHelper.JSON_FACTORY;
+  }
+
+  private static class ApiComponents {
+    public String rootUrl;
+    public String servicePath;
+
+    public ApiComponents(String root, String path) {
+      this.rootUrl = root;
+      this.servicePath = path;
+    }
+  }
+
+  private static ApiComponents apiComponentsFromUrl(String urlString) {
+    try {
+      URL url = new URL(urlString);
+      String rootUrl = url.getProtocol() + "://" + url.getHost()
+          + (url.getPort() > 0 ? ":" + url.getPort() : "");
+      return new ApiComponents(rootUrl, url.getPath());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException("Invalid URL: " + urlString);
+    }
+  }
+
+  /**
+   * Returns a Cloud Storage client builder using the specified {@link GcsOptions}.
+   */
+  public static Storage.Builder
+      newStorageClient(GcsOptions options) {
+    String servicePath = options.getGcsEndpoint();
+    Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log the code 404. Code up the stack will deal with 404's if needed, and
+            // logging it by default clutters the output during file staging.
+            new RetryHttpRequestInitializer(
+                ImmutableList.of(404), new UploadIdResponseInterceptor())))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+    if (servicePath != null) {
+      ApiComponents components = apiComponentsFromUrl(servicePath);
+      storageBuilder.setRootUrl(components.rootUrl);
+      storageBuilder.setServicePath(components.servicePath);
+    }
+    return storageBuilder;
+  }
+
+  private static HttpRequestInitializer chainHttpRequestInitializer(
+      Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+    if (credential == null) {
+      return new ChainingHttpRequestInitializer(
+          new NullCredentialInitializer(), httpRequestInitializer);
+    } else {
+      return new ChainingHttpRequestInitializer(
+          new HttpCredentialsAdapter(credential),
+          httpRequestInitializer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
new file mode 100644
index 0000000..6a71bdc
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.gcsfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.google.api.services.storage.model.StorageObject;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Implements the Java NIO {@link Path} API for Google Cloud Storage paths.
+ *
+ * <p>GcsPath uses a slash ('/') as a directory separator.  Below is
+ * a summary of how slashes are treated:
+ * <ul>
+ *   <li> A GCS bucket may not contain a slash.  An object may contain zero or
+ *        more slashes.
+ *   <li> A trailing slash always indicates a directory, which is compliant
+ *        with POSIX.1-2008.
+ *   <li> Slashes separate components of a path.  Empty components are allowed,
+ *        these are represented as repeated slashes.  An empty component always
+ *        refers to a directory, and always ends in a slash.
+ *   <li> {@link #getParent()}} always returns a path ending in a slash, as the
+ *        parent of a GcsPath is always a directory.
+ *   <li> Use {@link #resolve(String)} to append elements to a GcsPath -- this
+ *        applies the rules consistently and is highly recommended over any
+ *        custom string concatenation.
+ * </ul>
+ *
+ * <p>GcsPath treats all GCS objects and buckets as belonging to the same
+ * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="".
+ *
+ * <p>Relative paths are not associated with any bucket.  This matches common
+ * treatment of Path in which relative paths can be constructed from one
+ * filesystem and appended to another filesystem.
+ *
+ * @see <a href=
+ * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
+ * >Java Tutorials: Path Operations</a>
+ */
+public class GcsPath implements Path, Serializable {
+
+  public static final String SCHEME = "gs";
+
+  /**
+   * Creates a GcsPath from a URI.
+   *
+   * <p>The URI must be in the form {@code gs://[bucket]/[path]}, and may not
+   * contain a port, user info, a query, or a fragment.
+   */
+  public static GcsPath fromUri(URI uri) {
+    checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME), "URI: %s is not a GCS URI", uri);
+    checkArgument(uri.getPort() == -1,
+        "GCS URI may not specify port: %s (%i)", uri, uri.getPort());
+    checkArgument(
+        isNullOrEmpty(uri.getUserInfo()),
+        "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo());
+    checkArgument(
+        isNullOrEmpty(uri.getQuery()),
+        "GCS URI may not specify query: %s (%s)", uri, uri.getQuery());
+    checkArgument(
+        isNullOrEmpty(uri.getFragment()),
+        "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment());
+
+    return fromUri(uri.toString());
+  }
+
+  /**
+   * Pattern that is used to parse a GCS URL.
+   *
+   * <p>This is used to separate the components.  Verification is handled
+   * separately.
+   */
+  public static final Pattern GCS_URI =
+      Pattern.compile("(?<SCHEME>[^:]+)://(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
+
+  /**
+   * Creates a GcsPath from a URI in string form.
+   *
+   * <p>This does not use URI parsing, which means it may accept patterns that
+   * the URI parser would not accept.
+   */
+  public static GcsPath fromUri(String uri) {
+    Matcher m = GCS_URI.matcher(uri);
+    checkArgument(m.matches(), "Invalid GCS URI: %s", uri);
+
+    checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME),
+        "URI: %s is not a GCS URI", uri);
+    return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
+  }
+
+  /**
+   * Pattern that is used to parse a GCS resource name.
+   */
+  private static final Pattern GCS_RESOURCE_NAME =
+      Pattern.compile("storage.googleapis.com/(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
+
+  /**
+   * Creates a GcsPath from a OnePlatform resource name in string form.
+   */
+  public static GcsPath fromResourceName(String name) {
+    Matcher m = GCS_RESOURCE_NAME.matcher(name);
+    checkArgument(m.matches(), "Invalid GCS resource name: %s", name);
+
+    return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
+  }
+
+  /**
+   * Creates a GcsPath from a {@linkplain StorageObject}.
+   */
+  public static GcsPath fromObject(StorageObject object) {
+    return new GcsPath(null, object.getBucket(), object.getName());
+  }
+
+  /**
+   * Creates a GcsPath from bucket and object components.
+   *
+   * <p>A GcsPath without a bucket name is treated as a relative path, which
+   * is a path component with no linkage to the root element.  This is similar
+   * to a Unix path that does not begin with the root marker (a slash).
+   * GCS has different naming constraints and APIs for working with buckets and
+   * objects, so these two concepts are kept separate to avoid accidental
+   * attempts to treat objects as buckets, or vice versa, as much as possible.
+   *
+   * <p>A GcsPath without an object name is a bucket reference.
+   * A bucket is always a directory, which could be used to lookup or add
+   * files to a bucket, but could not be opened as a file.
+   *
+   * <p>A GcsPath containing neither bucket or object names is treated as
+   * the root of the GCS filesystem.  A listing on the root element would return
+   * the buckets available to the user.
+   *
+   * <p>If {@code null} is passed as either parameter, it is converted to an
+   * empty string internally for consistency.  There is no distinction between
+   * an empty string and a {@code null}, as neither are allowed by GCS.
+   *
+   * @param bucket a GCS bucket name, or none ({@code null} or an empty string)
+   *               if the object is not associated with a bucket
+   *               (e.g. relative paths or the root node).
+   * @param object a GCS object path, or none ({@code null} or an empty string)
+   *               for no object.
+   */
+  public static GcsPath fromComponents(@Nullable String bucket,
+                                       @Nullable String object) {
+    return new GcsPath(null, bucket, object);
+  }
+
+  @Nullable
+  private transient FileSystem fs;
+  @Nonnull
+  private final String bucket;
+  @Nonnull
+  private final String object;
+
+  /**
+   * Constructs a GcsPath.
+   *
+   * @param fs the associated FileSystem, if any
+   * @param bucket the associated bucket, or none ({@code null} or an empty
+   *               string) for a relative path component
+   * @param object the object, which is a fully-qualified object name if bucket
+   *               was also provided, or none ({@code null} or an empty string)
+   *               for no object
+   * @throws java.lang.IllegalArgumentException if the bucket of object names
+   *         are invalid.
+   */
+  public GcsPath(@Nullable FileSystem fs,
+                 @Nullable String bucket,
+                 @Nullable String object) {
+    if (bucket == null) {
+      bucket = "";
+    }
+    checkArgument(!bucket.contains("/"),
+        "GCS bucket may not contain a slash");
+    checkArgument(bucket.isEmpty()
+                || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"),
+            "GCS bucket names must contain only lowercase letters, numbers, "
+                + "dashes (-), underscores (_), and dots (.). Bucket names "
+                + "must start and end with a number or letter. "
+                + "See https://developers.google.com/storage/docs/bucketnaming "
+                + "for more details.  Bucket name: " + bucket);
+
+    if (object == null) {
+      object = "";
+    }
+    checkArgument(
+        object.indexOf('\n') < 0 && object.indexOf('\r') < 0,
+        "GCS object names must not contain Carriage Return or "
+            + "Line Feed characters.");
+
+    this.fs = fs;
+    this.bucket = bucket;
+    this.object = object;
+  }
+
+  /**
+   * Returns the bucket name associated with this GCS path, or an empty string
+   * if this is a relative path component.
+   */
+  public String getBucket() {
+    return bucket;
+  }
+
+  /**
+   * Returns the object name associated with this GCS path, or an empty string
+   * if no object is specified.
+   */
+  public String getObject() {
+    return object;
+  }
+
+  public void setFileSystem(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @Override
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+
+  // Absolute paths are those that have a bucket and the root path.
+  @Override
+  public boolean isAbsolute() {
+    return !bucket.isEmpty() || object.isEmpty();
+  }
+
+  @Override
+  public GcsPath getRoot() {
+    return new GcsPath(fs, "", "");
+  }
+
+  @Override
+  public GcsPath getFileName() {
+    int nameCount = getNameCount();
+    if (nameCount < 2) {
+      throw new UnsupportedOperationException(
+          "Can't get filename from root path in the bucket: " + this);
+    }
+    return getName(nameCount - 1);
+  }
+
+  /**
+   * Returns the <em>parent path</em>, or {@code null} if this path does not
+   * have a parent.
+   *
+   * <p>Returns a path that ends in '/', as the parent path always refers to
+   * a directory.
+   */
+  @Override
+  public GcsPath getParent() {
+    if (bucket.isEmpty() && object.isEmpty()) {
+      // The root path has no parent, by definition.
+      return null;
+    }
+
+    if (object.isEmpty()) {
+      // A GCS bucket. All buckets come from a common root.
+      return getRoot();
+    }
+
+    // Skip last character, in case it is a trailing slash.
+    int i = object.lastIndexOf('/', object.length() - 2);
+    if (i <= 0) {
+      if (bucket.isEmpty()) {
+        // Relative paths are not attached to the root node.
+        return null;
+      }
+      return new GcsPath(fs, bucket, "");
+    }
+
+    // Retain trailing slash.
+    return new GcsPath(fs, bucket, object.substring(0, i + 1));
+  }
+
+  @Override
+  public int getNameCount() {
+    int count = bucket.isEmpty() ? 0 : 1;
+    if (object.isEmpty()) {
+      return count;
+    }
+
+    // Add another for each separator found.
+    int index = -1;
+    while ((index = object.indexOf('/', index + 1)) != -1) {
+      count++;
+    }
+
+    return object.endsWith("/") ? count : count + 1;
+  }
+
+  @Override
+  public GcsPath getName(int count) {
+    checkArgument(count >= 0);
+
+    Iterator<Path> iterator = iterator();
+    for (int i = 0; i < count; ++i) {
+      checkArgument(iterator.hasNext());
+      iterator.next();
+    }
+
+    checkArgument(iterator.hasNext());
+    return (GcsPath) iterator.next();
+  }
+
+  @Override
+  public GcsPath subpath(int beginIndex, int endIndex) {
+    checkArgument(beginIndex >= 0);
+    checkArgument(endIndex > beginIndex);
+
+    Iterator<Path> iterator = iterator();
+    for (int i = 0; i < beginIndex; ++i) {
+      checkArgument(iterator.hasNext());
+      iterator.next();
+    }
+
+    GcsPath path = null;
+    while (beginIndex < endIndex) {
+      checkArgument(iterator.hasNext());
+      if (path == null) {
+        path = (GcsPath) iterator.next();
+      } else {
+        path = path.resolve(iterator.next());
+      }
+      ++beginIndex;
+    }
+
+    return path;
+  }
+
+  @Override
+  public boolean startsWith(Path other) {
+    if (other instanceof GcsPath) {
+      GcsPath gcsPath = (GcsPath) other;
+      return startsWith(gcsPath.bucketAndObject());
+    } else {
+      return startsWith(other.toString());
+    }
+  }
+
+  @Override
+  public boolean startsWith(String prefix) {
+    return bucketAndObject().startsWith(prefix);
+  }
+
+  @Override
+  public boolean endsWith(Path other) {
+    if (other instanceof GcsPath) {
+      GcsPath gcsPath = (GcsPath) other;
+      return endsWith(gcsPath.bucketAndObject());
+    } else {
+      return endsWith(other.toString());
+    }
+  }
+
+  @Override
+  public boolean endsWith(String suffix) {
+    return bucketAndObject().endsWith(suffix);
+  }
+
+  // TODO: support "." and ".." path components?
+  @Override
+  public GcsPath normalize() {
+    return this;
+  }
+
+  @Override
+  public GcsPath resolve(Path other) {
+    if (other instanceof GcsPath) {
+      GcsPath path = (GcsPath) other;
+      if (path.isAbsolute()) {
+        return path;
+      } else {
+        return resolve(path.getObject());
+      }
+    } else {
+      return resolve(other.toString());
+    }
+  }
+
+  @Override
+  public GcsPath resolve(String other) {
+    if (bucket.isEmpty() && object.isEmpty()) {
+      // Resolve on a root path is equivalent to looking up a bucket and object.
+      other = SCHEME + "://" + other;
+    }
+
+    if (other.startsWith(SCHEME + "://")) {
+      GcsPath path = GcsPath.fromUri(other);
+      path.setFileSystem(getFileSystem());
+      return path;
+    }
+
+    if (other.isEmpty()) {
+      // An empty component MUST refer to a directory.
+      other = "/";
+    }
+
+    if (object.isEmpty()) {
+      return new GcsPath(fs, bucket, other);
+    } else if (object.endsWith("/")) {
+      return new GcsPath(fs, bucket, object + other);
+    } else {
+      return new GcsPath(fs, bucket, object + "/" + other);
+    }
+  }
+
+  @Override
+  public Path resolveSibling(Path other) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Path resolveSibling(String other) {
+    if (getNameCount() < 2) {
+      throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this);
+    }
+    GcsPath parent = getParent();
+    return (parent == null) ? fromUri(other) : parent.resolve(other);
+  }
+
+  @Override
+  public Path relativize(Path other) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public GcsPath toAbsolutePath() {
+    return this;
+  }
+
+  @Override
+  public GcsPath toRealPath(LinkOption... options) throws IOException {
+    return this;
+  }
+
+  @Override
+  public File toFile() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] events,
+      WatchEvent.Modifier... modifiers) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Iterator<Path> iterator() {
+    return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
+  }
+
+  private static class NameIterator implements Iterator<Path> {
+    private final FileSystem fs;
+    private boolean fullPath;
+    private String name;
+
+    NameIterator(FileSystem fs, boolean fullPath, String name) {
+      this.fs = fs;
+      this.fullPath = fullPath;
+      this.name = name;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return !isNullOrEmpty(name);
+    }
+
+    @Override
+    public GcsPath next() {
+      int i = name.indexOf('/');
+      String component;
+      if (i >= 0) {
+        component = name.substring(0, i);
+        name = name.substring(i + 1);
+      } else {
+        component = name;
+        name = null;
+      }
+      if (fullPath) {
+        fullPath = false;
+        return new GcsPath(fs, component, "");
+      } else {
+        // Relative paths have no bucket.
+        return new GcsPath(fs, "", component);
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public int compareTo(Path other) {
+    if (!(other instanceof GcsPath)) {
+      throw new ClassCastException();
+    }
+
+    GcsPath path = (GcsPath) other;
+    int b = bucket.compareTo(path.bucket);
+    if (b != 0) {
+      return b;
+    }
+
+    // Compare a component at a time, so that the separator char doesn't
+    // get compared against component contents.  Eg, "a/b" < "a-1/b".
+    Iterator<Path> left = iterator();
+    Iterator<Path> right = path.iterator();
+
+    while (left.hasNext() && right.hasNext()) {
+      String leftStr = left.next().toString();
+      String rightStr = right.next().toString();
+      int c = leftStr.compareTo(rightStr);
+      if (c != 0) {
+        return c;
+      }
+    }
+
+    if (!left.hasNext() && !right.hasNext()) {
+      return 0;
+    } else {
+      return left.hasNext() ? 1 : -1;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    GcsPath paths = (GcsPath) o;
+    return bucket.equals(paths.bucket) && object.equals(paths.object);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = bucket.hashCode();
+    result = 31 * result + object.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    if (!isAbsolute()) {
+      return object;
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append(SCHEME)
+        .append("://");
+    if (!bucket.isEmpty()) {
+      sb.append(bucket)
+          .append('/');
+    }
+    sb.append(object);
+    return sb.toString();
+  }
+
+  // TODO: Consider using resource names for all GCS paths used by the SDK.
+  public String toResourceName() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("storage.googleapis.com/");
+    if (!bucket.isEmpty()) {
+      sb.append(bucket).append('/');
+    }
+    sb.append(object);
+    return sb.toString();
+  }
+
+  @Override
+  public URI toUri() {
+    try {
+      return new URI(SCHEME, "//" + bucketAndObject(), null);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Unable to create URI for GCS path " + this);
+    }
+  }
+
+  private String bucketAndObject() {
+    if (bucket.isEmpty()) {
+      return object;
+    } else {
+      return bucket + "/" + object;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
new file mode 100644
index 0000000..4d49f8c
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines utilities used to interact with Google Cloud Storage. */
+package org.apache.beam.sdk.util.gcsfs;

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java
new file mode 100644
index 0000000..f8135e7
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */
+package org.apache.beam.sdk.util;

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
new file mode 100644
index 0000000..a8772c3
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** API surface verification for Google Cloud Platform core components. */
+@RunWith(JUnit4.class)
+public class GcpCoreApiSurfaceTest {
+
+  @Test
+  public void testApiSurface() throws Exception {
+
+    @SuppressWarnings("unchecked")
+    final Set<String> allowed =
+        ImmutableSet.of(
+            "org.apache.beam",
+            "com.google.api.client",
+            "com.google.api.services.storage",
+            "com.google.auth",
+            "com.fasterxml.jackson.annotation",
+            "com.fasterxml.jackson.core",
+            "com.fasterxml.jackson.databind",
+            "org.apache.avro",
+            "org.hamcrest",
+            // via DataflowMatchers
+            "org.codehaus.jackson",
+            // via Avro
+            "org.joda.time",
+            "org.junit");
+
+    assertThat(
+        ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
new file mode 100644
index 0000000..6f0846e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.auth;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Fake credential, for use in testing.
+ */
+public class TestCredential extends Credentials {
+  @Override
+  public String getAuthenticationType() {
+    return "Test";
+  }
+
+  @Override
+  public Map<String, List<String>> getRequestMetadata() throws IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public boolean hasRequestMetadata() {
+    return false;
+  }
+
+  @Override
+  public boolean hasRequestMetadataOnly() {
+    return true;
+  }
+
+  @Override
+  public void refresh() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
new file mode 100644
index 0000000..68b3818
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.api.services.storage.model.Bucket;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.GcpTempLocationFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcpOptions}. */
+@RunWith(Enclosed.class)
+public class GcpOptionsTest {
+
+  /** Tests for the majority of methods. */
+  @RunWith(JUnit4.class)
+  public static class Common {
+    @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
+      Map<String, String> environment =
+          ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath());
+      assertEquals("test-project",
+          runGetProjectTest(tmpFolder.newFile("properties"), environment));
+    }
+
+    @Test
+    public void testGetProjectFromAppDataEnv() throws Exception {
+      Map<String, String> environment =
+          ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
+      System.setProperty("os.name", "windows");
+      assertEquals("test-project",
+          runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"),
+              environment));
+    }
+
+    @Test
+    public void testGetProjectFromUserHomeEnvOld() throws Exception {
+      Map<String, String> environment = ImmutableMap.of();
+      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+      assertEquals("test-project",
+          runGetProjectTest(
+              new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
+              environment));
+    }
+
+    @Test
+    public void testGetProjectFromUserHomeEnv() throws Exception {
+      Map<String, String> environment = ImmutableMap.of();
+      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+      assertEquals("test-project", runGetProjectTest(
+          new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+          environment));
+    }
+
+    @Test
+    public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception {
+      Map<String, String> environment = ImmutableMap.of();
+      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+      makePropertiesFileWithProject(
+          new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), "old-project");
+      assertEquals("test-project", runGetProjectTest(
+          new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+          environment));
+    }
+
+    @Test
+    public void testUnableToGetDefaultProject() throws Exception {
+      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+      DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+      when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
+      assertNull(projectFactory.create(PipelineOptionsFactory.create()));
+    }
+
+    @Test
+    public void testEmptyGcpTempLocation() throws Exception {
+      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+      options.setGcpCredential(new TestCredential());
+      options.setProject("");
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("--project is a required option");
+      options.getGcpTempLocation();
+    }
+
+    @Test
+    public void testDefaultGcpTempLocation() throws Exception {
+      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+      String tempLocation = "gs://bucket";
+      options.setTempLocation(tempLocation);
+      options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
+      assertEquals(tempLocation, options.getGcpTempLocation());
+    }
+
+    @Test
+    public void testDefaultGcpTempLocationInvalid() throws Exception {
+      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+      options.setTempLocation("file://");
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage(
+          "Error constructing default value for gcpTempLocation: tempLocation is not"
+              + " a valid GCS path");
+      options.getGcpTempLocation();
+    }
+
+    @Test
+    public void testDefaultGcpTempLocationDoesNotExist() {
+      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+      String tempLocation = "gs://does/not/exist";
+      options.setTempLocation(tempLocation);
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage(
+          "Error constructing default value for gcpTempLocation: tempLocation is not"
+              + " a valid GCS path");
+      thrown.expectCause(
+          hasMessage(containsString("Output path does not exist or is not writeable")));
+
+      options.getGcpTempLocation();
+    }
+
+    private static void makePropertiesFileWithProject(File path, String projectId)
+        throws IOException {
+      String properties = String.format("[core]%n"
+          + "account = test-account@google.com%n"
+          + "project = %s%n"
+          + "%n"
+          + "[dataflow]%n"
+          + "magic = true%n", projectId);
+      Files.write(properties, path, StandardCharsets.UTF_8);
+    }
+
+    private static String runGetProjectTest(File path, Map<String, String> environment)
+        throws Exception {
+      makePropertiesFileWithProject(path, "test-project");
+      DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+      when(projectFactory.getEnvironment()).thenReturn(environment);
+      return projectFactory.create(PipelineOptionsFactory.create());
+    }
+  }
+
+  /** Tests related to determining the GCP temp location. */
+  @RunWith(JUnit4.class)
+  public static class GcpTempLocation {
+    @Rule public ExpectedException thrown = ExpectedException.none();
+    @Mock private GcsUtil mockGcsUtil;
+    @Mock private CloudResourceManager mockCrmClient;
+    @Mock private Projects mockProjects;
+    @Mock private Get mockGet;
+    private Project fakeProject;
+    private PipelineOptions options;
+
+    @Before
+    public void setUp() throws Exception {
+      MockitoAnnotations.initMocks(this);
+      options = PipelineOptionsFactory.create();
+      options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
+      options.as(GcpOptions.class).setProject("foo");
+      options.as(GcpOptions.class).setZone("us-north1-a");
+      when(mockCrmClient.projects()).thenReturn(mockProjects);
+      when(mockProjects.get(any(String.class))).thenReturn(mockGet);
+      fakeProject = new Project().setProjectNumber(1L);
+    }
+
+    @Test
+    public void testCreateBucket() throws Exception {
+      doReturn(fakeProject).when(mockGet).execute();
+      when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L);
+
+      String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+      assertEquals("gs://dataflow-staging-us-north1-1", bucket);
+    }
+
+    @Test
+    public void testCreateBucketProjectLookupFails() throws Exception {
+      doThrow(new IOException("badness")).when(mockGet).execute();
+
+      thrown.expect(RuntimeException.class);
+      thrown.expectMessage("Unable to verify project");
+      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+    }
+
+    @Test
+    public void testCreateBucketCreateBucketFails() throws Exception {
+      doReturn(fakeProject).when(mockGet).execute();
+      doThrow(new IOException("badness")).when(
+          mockGcsUtil).createBucket(any(String.class), any(Bucket.class));
+
+      thrown.expect(RuntimeException.class);
+      thrown.expectMessage("Unable create default bucket");
+      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+    }
+
+    @Test
+    public void testCannotGetBucketOwner() throws Exception {
+      doReturn(fakeProject).when(mockGet).execute();
+      when(mockGcsUtil.bucketOwner(any(GcsPath.class)))
+          .thenThrow(new IOException("badness"));
+
+      thrown.expect(RuntimeException.class);
+      thrown.expectMessage("Unable to determine the owner");
+      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+    }
+
+    @Test
+    public void testProjectMismatch() throws Exception {
+      doReturn(fakeProject).when(mockGet).execute();
+      when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
+
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("Bucket owner does not match the project");
+      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+    }
+
+    @Test
+    public void regionFromZone() throws Exception {
+      assertEquals("us-central1", GcpTempLocationFactory.getRegionFromZone("us-central1-a"));
+      assertEquals("asia-east", GcpTempLocationFactory.getRegionFromZone("asia-east-a"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
new file mode 100644
index 0000000..67d5880
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Delete;
+import com.google.api.services.storage.Storage;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GoogleApiDebugOptions.GoogleApiTracer;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.Transport;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GoogleApiDebugOptions}. */
+@RunWith(JUnit4.class)
+public class GoogleApiDebugOptionsTest {
+  private static final String STORAGE_GET_TRACE =
+      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
+  private static final String STORAGE_GET_AND_LIST_TRACE =
+      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\","
+      + "\"Objects.List\":\"ListTraceDestination\"}";
+  private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}";
+
+  @Test
+  public void testWhenTracingMatches() throws Exception {
+    String[] args = new String[] {STORAGE_GET_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.Get request =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("GetTraceDestination", request.get("$trace"));
+  }
+
+  @Test
+  public void testWhenTracingDoesNotMatch() throws Exception {
+    String[] args = new String[] {STORAGE_GET_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.List request =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertNull(request.get("$trace"));
+  }
+
+  @Test
+  public void testWithMultipleTraces() throws Exception {
+    String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("GetTraceDestination", getRequest.get("$trace"));
+
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertEquals("ListTraceDestination", listRequest.get("$trace"));
+  }
+
+  @Test
+  public void testMatchingAllCalls() throws Exception {
+    String[] args = new String[] {STORAGE_TRACE};
+    GcsOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("TraceDestination", getRequest.get("$trace"));
+
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertEquals("TraceDestination", listRequest.get("$trace"));
+  }
+
+  @Test
+  public void testMatchingAgainstClient() throws Exception {
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+        Transport.newStorageClient(options).build(), "TraceDestination"));
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("TraceDestination", getRequest.get("$trace"));
+
+    Delete deleteRequest = GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient(
+        options.as(CloudResourceManagerOptions.class))
+        .build().projects().delete("testProjectId");
+    assertNull(deleteRequest.get("$trace"));
+  }
+
+  @Test
+  public void testMatchingAgainstRequestType() throws Exception {
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+        Transport.newStorageClient(options).build().objects()
+            .get("aProjectId", "aObjectId"), "TraceDestination"));
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("TraceDestination", getRequest.get("$trace"));
+
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertNull(listRequest.get("$trace"));
+  }
+
+  @Test
+  public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception {
+    String serializedValue = "{\"Api\":\"Token\"}";
+    ObjectMapper objectMapper = new ObjectMapper();
+    assertEquals(serializedValue,
+        objectMapper.writeValueAsString(
+            objectMapper.readValue(serializedValue, GoogleApiTracer.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
new file mode 100644
index 0000000..a29dd45
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsIOChannelFactoryRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (IOChannelFactoryRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+      if (registrar instanceof GcsIOChannelFactoryRegistrar) {
+        return;
+      }
+    }
+    fail("Expected to find " + GcsIOChannelFactoryRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
new file mode 100644
index 0000000..f53490a
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GcsIOChannelFactoryTest}. */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryTest {
+  private GcsIOChannelFactory factory;
+
+  @Before
+  public void setUp() {
+    factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class));
+  }
+
+  @Test
+  public void testResolve() throws Exception {
+    assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..65fb228
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock private GcsUtil mockGcsUtil;
+  private GcsPathValidator validator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGcsUtil(mockGcsUtil);
+    validator = GcsPathValidator.fromOptions(options);
+  }
+
+  @Test
+  public void testValidFilePattern() {
+    validator.validateInputFilePatternSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidFilePattern() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateInputFilePatternSupported("/local/path");
+  }
+
+  @Test
+  public void testFilePatternMissingBucket() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Missing object or bucket in path: 'gs://input/', "
+            + "did you mean: 'gs://some-bucket/input'?");
+    validator.validateInputFilePatternSupported("gs://input");
+  }
+
+  @Test
+  public void testWhenBucketDoesNotExist() throws Exception {
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Could not find file gs://non-existent-bucket/location");
+    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+  }
+
+  @Test
+  public void testValidOutputPrefix() {
+    validator.validateOutputFilePrefixSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidOutputPrefix() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateOutputFilePrefixSupported("/local/path");
+  }
+
+  @Test
+  public void testOutputPrefixMissingBucket() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Missing object or bucket in path: 'gs://output/', "
+            + "did you mean: 'gs://some-bucket/output'?");
+    validator.validateOutputFilePrefixSupported("gs://output");
+  }
+}