You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/18 23:19:29 UTC

[3/7] beam git commit: [BEAM-1871] Create new GCP core module package and move several GCP related classes from beam-sdks-java-core over.

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
new file mode 100644
index 0000000..2187e7d
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.AppEngineEnvironment;
+import org.apache.beam.sdk.util.GcsPathValidator;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+
+/**
+ * Options used to configure Google Cloud Storage.
+ */
+public interface GcsOptions extends
+    ApplicationNameOptions, GcpOptions, PipelineOptions {
+  /**
+   * The GcsUtil instance that should be used to communicate with Google Cloud Storage.
+   */
+  @JsonIgnore
+  @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.")
+  @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class)
+  @Hidden
+  GcsUtil getGcsUtil();
+  void setGcsUtil(GcsUtil value);
+
+  /**
+   * The ExecutorService instance to use to create threads, can be overridden to specify an
+   * ExecutorService that is compatible with the users environment. If unset, the
+   * default is to create an ExecutorService with an unbounded number of threads; this
+   * is compatible with Google AppEngine.
+   */
+  @JsonIgnore
+  @Description("The ExecutorService instance to use to create multiple threads. Can be overridden "
+      + "to specify an ExecutorService that is compatible with the users environment. If unset, "
+      + "the default is to create an ExecutorService with an unbounded number of threads; this "
+      + "is compatible with Google AppEngine.")
+  @Default.InstanceFactory(ExecutorServiceFactory.class)
+  @Hidden
+  ExecutorService getExecutorService();
+  void setExecutorService(ExecutorService value);
+
+  /**
+   * GCS endpoint to use. If unspecified, uses the default endpoint.
+   */
+  @JsonIgnore
+  @Hidden
+  @Description("The URL for the GCS API.")
+  String getGcsEndpoint();
+  void setGcsEndpoint(String value);
+
+  /**
+   * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
+   * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
+   * restrictions and performance implications of this value.
+   */
+  @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the "
+      + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
+      + "information on the restrictions and performance implications of this value.\n\n"
+      + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
+      + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
+  @Nullable
+  Integer getGcsUploadBufferSizeBytes();
+  void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
+
+  /**
+   * The class of the validator that should be created and used to validate paths.
+   * If pathValidator has not been set explicitly, an instance of this class will be
+   * constructed and used as the path validator.
+   */
+  @Description("The class of the validator that should be created and used to validate paths. "
+      + "If pathValidator has not been set explicitly, an instance of this class will be "
+      + "constructed and used as the path validator.")
+  @Default.Class(GcsPathValidator.class)
+  Class<? extends PathValidator> getPathValidatorClass();
+  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+  /**
+   * The path validator instance that should be used to validate paths.
+   * If no path validator has been set explicitly, the default is to use the instance factory that
+   * constructs a path validator based upon the currently set pathValidatorClass.
+   */
+  @JsonIgnore
+  @Description("The path validator instance that should be used to validate paths. "
+      + "If no path validator has been set explicitly, the default is to use the instance factory "
+      + "that constructs a path validator based upon the currently set pathValidatorClass.")
+  @Default.InstanceFactory(PathValidatorFactory.class)
+  PathValidator getPathValidator();
+  void setPathValidator(PathValidator validator);
+
+  /**
+   * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The
+   * {@link ExecutorService} is compatible with AppEngine.
+   */
+  class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
+    @SuppressWarnings("deprecation")  // IS_APP_ENGINE is deprecated for internal use only.
+    @Override
+    public ExecutorService create(PipelineOptions options) {
+      ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+      threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+      if (!AppEngineEnvironment.IS_APP_ENGINE) {
+        // AppEngine doesn't allow modification of threads to be daemon threads.
+        threadFactoryBuilder.setDaemon(true);
+      }
+      /* The SDK requires an unbounded thread pool because a step may create X writers
+       * each requiring their own thread to perform the writes otherwise a writer may
+       * block causing deadlock for the step because the writers buffer is full.
+       * Also, the MapTaskExecutor launches the steps in reverse order and completes
+       * them in forward order thus requiring enough threads so that each step's writers
+       * can be active.
+       */
+      return new ThreadPoolExecutor(
+          0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
+          Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
+          new SynchronousQueue<Runnable>(),
+          threadFactoryBuilder.build());
+    }
+  }
+
+  /**
+   * Creates a {@link PathValidator} object using the class specified in
+   * {@link #getPathValidatorClass()}.
+   */
+  class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+    @Override
+    public PathValidator create(PipelineOptions options) {
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      return InstanceBuilder.ofType(PathValidator.class)
+          .fromClass(gcsOptions.getPathValidatorClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
new file mode 100644
index 0000000..f9cb575
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.google.api.client.googleapis.services.AbstractGoogleClient;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.googleapis.services.GoogleClientRequestInitializer;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * These options configure debug settings for Google API clients created within the Apache Beam SDK.
+ */
+public interface GoogleApiDebugOptions extends PipelineOptions {
+  /**
+   * This option enables tracing of API calls to Google services used within the Apache
+   * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...}
+   * </code> where the {@code ApiName} represents the request classes canonical name. The
+   * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported.
+   * Typically, "producer" is the right destination to use: this makes API traces available to the
+   * team offering the API. Note that by enabling this option, the contents of the requests to and
+   * from Google Cloud services will be made available to Google. For example, by specifying
+   * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available
+   * to Google, specifically to the Google Cloud Dataflow team.
+   */
+  @Description("This option enables tracing of API calls to Google services used within the Apache "
+      + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} "
+      + "where the ApiName represents the request classes canonical name. The TraceDestination is "
+      + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is "
+      + "the right destination to use: this makes API traces available to the team offering the "
+      + "API. Note that by enabling this option, the contents of the requests to and from "
+      + "Google Cloud services will be made available to Google. For example, by specifying "
+      + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to "
+      + "Google, specifically to the Google Cloud Dataflow team.")
+  GoogleApiTracer getGoogleApiTrace();
+  void setGoogleApiTrace(GoogleApiTracer commands);
+
+  /**
+   * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls.
+   */
+  class GoogleApiTracer extends HashMap<String, String>
+      implements GoogleClientRequestInitializer {
+    /**
+     * Creates a {@link GoogleApiTracer} that sets the trace destination on all
+     * calls that match the given client type.
+     */
+    public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) {
+      put(client.getClass().getCanonicalName(), traceDestination);
+      return this;
+    }
+
+    /**
+     * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all
+     * calls that match for the given request type.
+     */
+    public GoogleApiTracer addTraceFor(
+        AbstractGoogleClientRequest<?> request, String traceDestination) {
+      put(request.getClass().getCanonicalName(), traceDestination);
+      return this;
+    }
+
+    @Override
+    public void initialize(AbstractGoogleClientRequest<?> request) throws IOException {
+      for (Map.Entry<String, String> entry : this.entrySet()) {
+        if (request.getClass().getCanonicalName().contains(entry.getKey())) {
+          request.set("$trace", entry.getValue());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
new file mode 100644
index 0000000..b065d19
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+/**
+ * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google Cloud Pub/Sub. See "
+    + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.")
+public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
+    PipelineOptions, StreamingOptions {
+
+  /**
+   * Root URL for use with the Google Cloud Pub/Sub API.
+   */
+  @Description("Root URL for use with the Google Cloud Pub/Sub API")
+  @Default.String("https://pubsub.googleapis.com")
+  @Hidden
+  String getPubsubRootUrl();
+  void setPubsubRootUrl(String value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java
new file mode 100644
index 0000000..465e742
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines {@link org.apache.beam.sdk.options.PipelineOptions} for
+ * configuring pipeline execution for Google Cloud Platform components.
+ */
+package org.apache.beam.sdk.options;

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
new file mode 100644
index 0000000..8f752c0
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.BigqueryScopes;
+import com.google.api.services.bigquery.model.QueryRequest;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Transport;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A matcher to verify data in BigQuery by processing given query
+ * and comparing with content's checksum.
+ *
+ * <p>Example:
+ * <pre>{@code [
+ *   assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum));
+ * ]}</pre>
+ */
+@NotThreadSafe
+public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
+    implements SerializableMatcher<PipelineResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class);
+
+  // The maximum number of retries to execute a BigQuery RPC
+  static final int MAX_QUERY_RETRIES = 4;
+
+  // The initial backoff for executing a BigQuery RPC
+  private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L);
+
+  // The total number of rows in query response to be formatted for debugging purpose
+  private static final int TOTAL_FORMATTED_ROWS = 20;
+
+  // The backoff factory with initial configs
+  static final FluentBackoff BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withMaxRetries(MAX_QUERY_RETRIES)
+          .withInitialBackoff(INITIAL_BACKOFF);
+
+  private final String applicationName;
+  private final String projectId;
+  private final String query;
+  private final String expectedChecksum;
+  private String actualChecksum;
+  private transient QueryResponse response;
+
+  public BigqueryMatcher(
+      String applicationName, String projectId, String query, String expectedChecksum) {
+    validateArgument("applicationName", applicationName);
+    validateArgument("projectId", projectId);
+    validateArgument("query", query);
+    validateArgument("expectedChecksum", expectedChecksum);
+
+    this.applicationName = applicationName;
+    this.projectId = projectId;
+    this.query = query;
+    this.expectedChecksum = expectedChecksum;
+  }
+
+  @Override
+  protected boolean matchesSafely(PipelineResult pipelineResult) {
+    LOG.info("Verifying Bigquery data");
+    Bigquery bigqueryClient = newBigqueryClient(applicationName);
+
+    // execute query
+    LOG.debug("Executing query: {}", query);
+    try {
+      QueryRequest queryContent = new QueryRequest();
+      queryContent.setQuery(query);
+
+      response = queryWithRetries(
+          bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedIOException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new RuntimeException("Failed to fetch BigQuery data.", e);
+    }
+
+    if (!response.getJobComplete()) {
+      // query job not complete, verification failed
+      return false;
+    } else {
+      // compute checksum
+      actualChecksum = generateHash(response.getRows());
+      LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
+
+      return expectedChecksum.equals(actualChecksum);
+    }
+  }
+
+  @VisibleForTesting
+  Bigquery newBigqueryClient(String applicationName) {
+    HttpTransport transport = Transport.getTransport();
+    JsonFactory jsonFactory = Transport.getJsonFactory();
+    Credentials credential = getDefaultCredential();
+
+    return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential))
+        .setApplicationName(applicationName)
+        .build();
+  }
+
+  @Nonnull
+  @VisibleForTesting
+  QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
+                                 Sleeper sleeper, BackOff backOff)
+      throws IOException, InterruptedException {
+    IOException lastException = null;
+    do {
+      if (lastException != null) {
+        LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
+      }
+      try {
+        QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
+        if (response != null) {
+          return response;
+        } else {
+          lastException =
+              new IOException("Expected valid response from query job, but received null.");
+        }
+      } catch (IOException e) {
+        // ignore and retry
+        lastException = e;
+      }
+    } while(BackOffUtils.next(sleeper, backOff));
+
+    throw new RuntimeException(
+        String.format(
+            "Unable to get BigQuery response after retrying %d times using query (%s)",
+            MAX_QUERY_RETRIES,
+            queryContent.getQuery()),
+        lastException);
+  }
+
+  private void validateArgument(String name, String value) {
+    checkArgument(
+        !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value);
+  }
+
+  private Credentials getDefaultCredential() {
+    GoogleCredentials credential;
+    try {
+      credential = GoogleCredentials.getApplicationDefault();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to get application default credential.", e);
+    }
+
+    if (credential.createScopedRequired()) {
+      Collection<String> bigqueryScope =
+          Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY);
+      credential = credential.createScoped(bigqueryScope);
+    }
+    return credential;
+  }
+
+  private String generateHash(@Nonnull List<TableRow> rows) {
+    List<HashCode> rowHashes = Lists.newArrayList();
+    for (TableRow row : rows) {
+      List<String> cellsInOneRow = Lists.newArrayList();
+      for (TableCell cell : row.getF()) {
+        cellsInOneRow.add(Objects.toString(cell.getV()));
+        Collections.sort(cellsInOneRow);
+      }
+      rowHashes.add(
+          Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8));
+    }
+    return Hashing.combineUnordered(rowHashes).toString();
+  }
+
+  @Override
+  public void describeTo(Description description) {
+    description
+        .appendText("Expected checksum is (")
+        .appendText(expectedChecksum)
+        .appendText(")");
+  }
+
+  @Override
+  public void describeMismatchSafely(PipelineResult pResult, Description description) {
+    String info;
+    if (!response.getJobComplete()) {
+      // query job not complete
+      info = String.format("The query job hasn't completed. Got response: %s", response);
+    } else {
+      // checksum mismatch
+      info = String.format("was (%s).%n"
+          + "\tTotal number of rows are: %d.%n"
+          + "\tQueried data details:%s",
+          actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS));
+    }
+    description.appendText(info);
+  }
+
+  private String formatRows(int totalNumRows) {
+    StringBuilder samples = new StringBuilder();
+    List<TableRow> rows = response.getRows();
+    for (int i = 0; i < totalNumRows && i < rows.size(); i++) {
+      samples.append(String.format("%n\t\t"));
+      for (TableCell field : rows.get(i).getF()) {
+        samples.append(String.format("%-10s", field.getV()));
+      }
+    }
+    if (rows.size() > totalNumRows) {
+      samples.append(String.format("%n\t\t..."));
+    }
+    return samples.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
new file mode 100644
index 0000000..1494026
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines utilities for unit testing Google Cloud Platform components of Apache Beam pipelines.
+ */
+package org.apache.beam.sdk.testing;

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
new file mode 100644
index 0000000..b0fcbd1
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.lang.reflect.InvocationTargetException;
+
+/** Stores whether we are running within AppEngine or not. */
+public class AppEngineEnvironment {
+  /**
+   * True if running inside of AppEngine, false otherwise.
+   */
+  @Deprecated
+  public static final boolean IS_APP_ENGINE = isAppEngine();
+
+  /**
+   * Attempts to detect whether we are inside of AppEngine.
+   *
+   * <p>Purposely copied and left private from private <a href="https://code.google.com/p/
+   * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/
+   * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>.
+   *
+   * @return true if we are inside of AppEngine, false otherwise.
+   */
+  static boolean isAppEngine() {
+    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
+      return false;
+    }
+    try {
+      // If the current environment is null, we're not inside AppEngine.
+      return Class.forName("com.google.apphosting.api.ApiProxy")
+          .getMethod("getCurrentEnvironment")
+          .invoke(null) != null;
+    } catch (ClassNotFoundException e) {
+      // If ApiProxy doesn't exist, we're not on AppEngine at all.
+      return false;
+    } catch (InvocationTargetException e) {
+      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
+      return false;
+    } catch (IllegalAccessException e) {
+      // If the method isn't accessible, we're not on a supported version of AppEngine;
+      return false;
+    } catch (NoSuchMethodException e) {
+      // If the method doesn't exist, we're not on a supported version of AppEngine;
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
new file mode 100644
index 0000000..6229650
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ */
+public interface CredentialFactory {
+  Credentials getCredential() throws IOException, GeneralSecurityException;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
new file mode 100644
index 0000000..75954c0
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.google.api.services.storage.model.Bucket;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for handling default GCS buckets.
+ */
+public class DefaultBucket {
+  static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
+
+  static final String DEFAULT_REGION = "us-central1";
+
+  /**
+   * Creates a default bucket or verifies the existence and proper access control
+   * of an existing default bucket.  Returns the location if successful.
+   */
+  public static String tryCreateDefaultBucket(PipelineOptions options) {
+    GcsOptions gcpOptions = options.as(GcsOptions.class);
+
+    final String projectId = gcpOptions.getProject();
+    checkArgument(!isNullOrEmpty(projectId),
+                  "--project is a required option.");
+
+    // Look up the project number, to create a default bucket with a stable
+    // name with no special characters.
+    long projectNumber = 0L;
+    try {
+      projectNumber = gcpOptions.as(CloudResourceManagerOptions.class)
+          .getGcpProjectUtil().getProjectNumber(projectId);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to verify project with ID " + projectId, e);
+    }
+    String region = DEFAULT_REGION;
+    if (!isNullOrEmpty(gcpOptions.getZone())) {
+      region = getRegionFromZone(gcpOptions.getZone());
+    }
+    final String bucketName =
+      "dataflow-staging-" + region + "-" + projectNumber;
+    LOG.info("No staging location provided, attempting to use default bucket: {}",
+             bucketName);
+    Bucket bucket = new Bucket()
+      .setName(bucketName)
+      .setLocation(region);
+    // Always try to create the bucket before checking access, so that we do not
+    // race with other pipelines that may be attempting to do the same thing.
+    try {
+      gcpOptions.getGcsUtil().createBucket(projectId, bucket);
+    } catch (FileAlreadyExistsException e) {
+      LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable create default bucket.", e);
+    }
+
+    // Once the bucket is expected to exist, verify that it is correctly owned
+    // by the project executing the job.
+    try {
+      long owner = gcpOptions.getGcsUtil().bucketOwner(
+        GcsPath.fromComponents(bucketName, ""));
+      checkArgument(
+        owner == projectNumber,
+        "Bucket owner does not match the project from --project:"
+        + " %s vs. %s", owner, projectNumber);
+    } catch (IOException e) {
+      throw new RuntimeException(
+        "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
+    }
+    return "gs://" + bucketName;
+  }
+
+  @VisibleForTesting
+  static String getRegionFromZone(String zone) {
+    String[] zoneParts = zone.split("-");
+    checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
+    return zoneParts[0] + "-" + zoneParts[1];
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
new file mode 100644
index 0000000..e1fa18f
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ * Returns a GCP credential.
+ */
+public class GcpCredentialFactory implements CredentialFactory {
+  /**
+   * The scope cloud-platform provides access to all Cloud Platform resources.
+   * cloud-platform isn't sufficient yet for talking to datastore so we request
+   * those resources separately.
+   *
+   * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for
+   * services we access directly (GCS) as opposed to through the backend
+   * (BigQuery, GCE), we need to explicitly request that scope.
+   */
+  private static final List<String> SCOPES = Arrays.asList(
+      "https://www.googleapis.com/auth/cloud-platform",
+      "https://www.googleapis.com/auth/devstorage.full_control",
+      "https://www.googleapis.com/auth/userinfo.email",
+      "https://www.googleapis.com/auth/datastore",
+      "https://www.googleapis.com/auth/pubsub");
+
+  private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory();
+
+  public static GcpCredentialFactory fromOptions(PipelineOptions options) {
+    return INSTANCE;
+  }
+
+  /**
+   * Returns a default GCP {@link Credentials} or null when it fails.
+   */
+  @Override
+  public Credentials getCredential() {
+    try {
+      return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
+    } catch (IOException e) {
+      // Ignore the exception
+      // Pipelines that only access to public data should be able to run without credentials.
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
new file mode 100644
index 0000000..f73afe0
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides operations on Google Cloud Platform Projects.
+ */
+public class GcpProjectUtil {
+  /**
+   * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using
+   * any transport flags specified on the {@link PipelineOptions}.
+   */
+  public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> {
+    /**
+     * Returns an instance of {@link GcpProjectUtil} based on the
+     * {@link PipelineOptions}.
+     */
+    @Override
+    public GcpProjectUtil create(PipelineOptions options) {
+      LOG.debug("Creating new GcpProjectUtil");
+      CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class);
+      return new GcpProjectUtil(
+          Transport.newCloudResourceManagerClient(crmOptions).build());
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class);
+
+  private static final FluentBackoff BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+
+  /** Client for the CRM API. */
+  private CloudResourceManager crmClient;
+
+  private GcpProjectUtil(CloudResourceManager crmClient) {
+    this.crmClient = crmClient;
+  }
+
+  // Use this only for testing purposes.
+  @VisibleForTesting
+  void setCrmClient(CloudResourceManager crmClient) {
+    this.crmClient = crmClient;
+  }
+
+  /**
+   * Returns the project number or throws an exception if the project does not
+   * exist or has other access exceptions.
+   */
+  public long getProjectNumber(String projectId) throws IOException {
+    return getProjectNumber(
+      projectId,
+      BACKOFF_FACTORY.backoff(),
+      Sleeper.DEFAULT);
+  }
+
+  /**
+   * Returns the project number or throws an error if the project does not
+   * exist or has other access errors.
+   */
+  @VisibleForTesting
+  long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException {
+      CloudResourceManager.Projects.Get getProject =
+          crmClient.projects().get(projectId);
+      try {
+        Project project = ResilientOperation.retry(
+            ResilientOperation.getGoogleRequestCallable(getProject),
+            backoff,
+            RetryDeterminer.SOCKET_ERRORS,
+            IOException.class,
+            sleeper);
+        return project.getProjectNumber();
+      } catch (Exception e) {
+        throw new IOException("Unable to get project number", e);
+     }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
new file mode 100644
index 0000000..745dcb9
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * Implements IOChannelFactory for GCS.
+ */
+public class GcsIOChannelFactory implements IOChannelFactory {
+
+  /**
+   * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}.
+   */
+  public static GcsIOChannelFactory fromOptions(PipelineOptions options) {
+    return new GcsIOChannelFactory(options.as(GcsOptions.class));
+  }
+
+  private final GcsOptions options;
+
+  private GcsIOChannelFactory(GcsOptions options) {
+    this.options = options;
+  }
+
+  @Override
+  public Collection<String> match(String spec) throws IOException {
+    GcsPath path = GcsPath.fromUri(spec);
+    GcsUtil util = options.getGcsUtil();
+    List<GcsPath> matched = util.expand(path);
+
+    List<String> specs = new LinkedList<>();
+    for (GcsPath match : matched) {
+      specs.add(match.toString());
+    }
+
+    return specs;
+  }
+
+  @Override
+  public ReadableByteChannel open(String spec) throws IOException {
+    GcsPath path = GcsPath.fromUri(spec);
+    GcsUtil util = options.getGcsUtil();
+    return util.open(path);
+  }
+
+  @Override
+  public WritableByteChannel create(String spec, String mimeType)
+      throws IOException {
+    GcsPath path = GcsPath.fromUri(spec);
+    GcsUtil util = options.getGcsUtil();
+    return util.create(path, mimeType);
+  }
+
+  @Override
+  public long getSizeBytes(String spec) throws IOException {
+    GcsPath path = GcsPath.fromUri(spec);
+    GcsUtil util = options.getGcsUtil();
+    return util.fileSize(path);
+  }
+
+  @Override
+  public boolean isReadSeekEfficient(String spec) throws IOException {
+    // TODO It is incorrect to return true here for files with content encoding set to gzip.
+    return true;
+  }
+
+  @Override
+  public String resolve(String path, String other) throws IOException {
+    return toPath(path).resolve(other).toString();
+  }
+
+  @Override
+  public Path toPath(String path) {
+    return GcsPath.fromUri(path);
+  }
+
+  @Override
+  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+      throws IOException {
+    options.getGcsUtil().copy(srcFilenames, destFilenames);
+  }
+
+  @Override
+  public void remove(Collection<String> filesOrDirs) throws IOException {
+    options.getGcsUtil().remove(filesOrDirs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
new file mode 100644
index 0000000..b4c457f
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link GcsIOChannelFactory}.
+ */
+@AutoService(IOChannelFactoryRegistrar.class)
+public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar {
+
+  @Override
+  public GcsIOChannelFactory fromOptions(PipelineOptions options) {
+    return GcsIOChannelFactory.fromOptions(options);
+  }
+
+  @Override
+  public String getScheme() {
+    return "gs";
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
new file mode 100644
index 0000000..a5b951d
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+  private GcsOptions gcpOptions;
+
+  private GcsPathValidator(GcsOptions options) {
+    this.gcpOptions = options;
+  }
+
+  public static GcsPathValidator fromOptions(PipelineOptions options) {
+    return new GcsPathValidator(options.as(GcsOptions.class));
+  }
+
+  /**
+   * Validates the the input GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateInputFilePatternSupported(String filepattern) {
+    GcsPath gcsPath = getGcsPath(filepattern);
+    checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+    String returnValue = verifyPath(filepattern);
+    verifyPathIsAccessible(filepattern, "Could not find file %s");
+    return returnValue;
+  }
+
+  /**
+   * Validates the the output GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateOutputFilePrefixSupported(String filePrefix) {
+    String returnValue = verifyPath(filePrefix);
+    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+    return returnValue;
+  }
+
+  @Override
+  public String verifyPath(String path) {
+    GcsPath gcsPath = getGcsPath(path);
+    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+    checkArgument(!gcsPath.getObject().contains("//"),
+        "Dataflow Service does not allow objects with consecutive slashes");
+    return gcsPath.toResourceName();
+  }
+
+  private void verifyPathIsAccessible(String path, String errorMessage) {
+    GcsPath gcsPath = getGcsPath(path);
+    try {
+      checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
+        errorMessage, path);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+          e);
+    }
+  }
+
+  private GcsPath getGcsPath(String path) {
+    try {
+      return GcsPath.fromUri(path);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Expected a valid 'gs://' path but was given '%s'", path), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
new file mode 100644
index 0000000..1c853bb
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -0,0 +1,798 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
+import com.google.api.client.googleapis.json.GoogleJsonError;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
+import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.ClientRequestHelper;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides operations on GCS.
+ */
+public class GcsUtil {
+  /**
+   * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using
+   * any transport flags specified on the {@link PipelineOptions}.
+   */
+  public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {
+    /**
+     * Returns an instance of {@link GcsUtil} based on the
+     * {@link PipelineOptions}.
+     *
+     * <p>If no instance has previously been created, one is created and the value
+     * stored in {@code options}.
+     */
+    @Override
+    public GcsUtil create(PipelineOptions options) {
+      LOG.debug("Creating new GcsUtil");
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
+      return new GcsUtil(
+          storageBuilder.build(),
+          storageBuilder.getHttpRequestInitializer(),
+          gcsOptions.getExecutorService(),
+          gcsOptions.getGcsUploadBufferSizeBytes());
+    }
+
+    /**
+     * Returns an instance of {@link GcsUtil} based on the given parameters.
+     */
+    public static GcsUtil create(
+        Storage storageClient,
+        HttpRequestInitializer httpRequestInitializer,
+        ExecutorService executorService,
+        @Nullable Integer uploadBufferSizeBytes) {
+      return new GcsUtil(
+          storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
+
+  /** Maximum number of items to retrieve per Objects.List request. */
+  private static final long MAX_LIST_ITEMS_PER_CALL = 1024;
+
+  /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */
+  private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
+
+  private static final String RECURSIVE_WILDCARD = "[*]{2}";
+
+  /**
+   * A {@link Pattern} for globs with a recursive wildcard.
+   */
+  private static final Pattern RECURSIVE_GCS_PATTERN =
+      Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");
+
+  /**
+   * Maximum number of requests permitted in a GCS batch request.
+   */
+  private static final int MAX_REQUESTS_PER_BATCH = 100;
+  /**
+   * Maximum number of concurrent batches of requests executing on GCS.
+   */
+  private static final int MAX_CONCURRENT_BATCHES = 256;
+
+  private static final FluentBackoff BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Client for the GCS API. */
+  private Storage storageClient;
+  private final HttpRequestInitializer httpRequestInitializer;
+  /** Buffer size for GCS uploads (in bytes). */
+  @Nullable private final Integer uploadBufferSizeBytes;
+
+  // Helper delegate for turning IOExceptions from API calls into higher-level semantics.
+  private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+
+  // Exposed for testing.
+  final ExecutorService executorService;
+
+  /**
+   * Returns true if the given GCS pattern is supported otherwise fails with an
+   * exception.
+   */
+  public static boolean isGcsPatternSupported(String gcsPattern) {
+    if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) {
+      throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": "
+          + " recursive wildcards are not supported.");
+    }
+    return true;
+  }
+
+  /**
+   * Returns the prefix portion of the glob that doesn't contain wildcards.
+   */
+  public static String getGlobPrefix(String globExp) {
+    checkArgument(isGcsPatternSupported(globExp));
+    Matcher m = GLOB_PREFIX.matcher(globExp);
+    checkArgument(
+        m.matches(),
+        String.format("Glob expression: [%s] is not expandable.", globExp));
+    return m.group("PREFIX");
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  public static String globToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          dst.append("[^/]*");
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  /**
+   * Returns true if the given {@code spec} contains glob.
+   */
+  public static boolean isGlob(GcsPath spec) {
+    return GLOB_PREFIX.matcher(spec.getObject()).matches();
+  }
+
+  private GcsUtil(
+      Storage storageClient,
+      HttpRequestInitializer httpRequestInitializer,
+      ExecutorService executorService,
+      @Nullable Integer uploadBufferSizeBytes) {
+    this.storageClient = storageClient;
+    this.httpRequestInitializer = httpRequestInitializer;
+    this.uploadBufferSizeBytes = uploadBufferSizeBytes;
+    this.executorService = executorService;
+  }
+
+  // Use this only for testing purposes.
+  protected void setStorageClient(Storage storageClient) {
+    this.storageClient = storageClient;
+  }
+
+  /**
+   * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded
+   * in the result. For patterns that only match a single object, we ensure that the object
+   * exists.
+   */
+  public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
+    checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
+    Pattern p = null;
+    String prefix = null;
+    if (!isGlob(gcsPattern)) {
+      // Not a glob.
+      try {
+        // Use a get request to fetch the metadata of the object, and ignore the return value.
+        // The request has strong global consistency.
+        getObject(gcsPattern);
+        return ImmutableList.of(gcsPattern);
+      } catch (FileNotFoundException e) {
+        // If the path was not found, return an empty list.
+        return ImmutableList.of();
+      }
+    } else {
+      // Part before the first wildcard character.
+      prefix = getGlobPrefix(gcsPattern.getObject());
+      p = Pattern.compile(globToRegexp(gcsPattern.getObject()));
+    }
+
+    LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
+        prefix, p.toString());
+
+    String pageToken = null;
+    List<GcsPath> results = new LinkedList<>();
+    do {
+      Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
+      if (objects.getItems() == null) {
+        break;
+      }
+
+      // Filter objects based on the regex.
+      for (StorageObject o : objects.getItems()) {
+        String name = o.getName();
+        // Skip directories, which end with a slash.
+        if (p.matcher(name).matches() && !name.endsWith("/")) {
+          LOG.debug("Matched object: {}", name);
+          results.add(GcsPath.fromObject(o));
+        }
+      }
+      pageToken = objects.getNextPageToken();
+    } while (pageToken != null);
+
+    return results;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Integer getUploadBufferSizeBytes() {
+    return uploadBufferSizeBytes;
+  }
+
+  /**
+   * Returns the file size from GCS or throws {@link FileNotFoundException}
+   * if the resource does not exist.
+   */
+  public long fileSize(GcsPath path) throws IOException {
+    return getObject(path).getSize().longValue();
+  }
+
+  /**
+   * Returns the {@link StorageObject} for the given {@link GcsPath}.
+   */
+  public StorageObject getObject(GcsPath gcsPath) throws IOException {
+    return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+  }
+
+  @VisibleForTesting
+  StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException {
+    Storage.Objects.Get getObject =
+        storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
+    try {
+      return ResilientOperation.retry(
+          ResilientOperation.getGoogleRequestCallable(getObject),
+          backoff,
+          RetryDeterminer.SOCKET_ERRORS,
+          IOException.class,
+          sleeper);
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
+        throw new FileNotFoundException(gcsPath.toString());
+      }
+      throw new IOException(
+          String.format("Unable to get the file object for path %s.", gcsPath),
+          e);
+    }
+  }
+
+  /**
+   * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given
+   * {@link GcsPath GcsPaths}.
+   */
+  public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
+      throws IOException {
+    List<StorageObjectOrIOException[]> results = new ArrayList<>();
+    executeBatches(makeGetBatches(gcsPaths, results));
+    ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder();
+    for (StorageObjectOrIOException[] result : results) {
+      ret.add(result[0]);
+    }
+    return ret.build();
+  }
+
+  /**
+   * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}.
+   */
+  public Objects listObjects(String bucket, String prefix, @Nullable String pageToken)
+      throws IOException {
+    // List all objects that start with the prefix (including objects in sub-directories).
+    Storage.Objects.List listObject = storageClient.objects().list(bucket);
+    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
+    listObject.setPrefix(prefix);
+
+    if (pageToken != null) {
+      listObject.setPageToken(pageToken);
+    }
+
+    try {
+      return ResilientOperation.retry(
+          ResilientOperation.getGoogleRequestCallable(listObject),
+          BACKOFF_FACTORY.backoff(),
+          RetryDeterminer.SOCKET_ERRORS,
+          IOException.class);
+    } catch (Exception e) {
+      throw new IOException(
+          String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix),
+          e);
+    }
+  }
+
+  /**
+   * Returns the file size from GCS or throws {@link FileNotFoundException}
+   * if the resource does not exist.
+   */
+  @VisibleForTesting
+  List<Long> fileSizes(List<GcsPath> paths) throws IOException {
+    List<StorageObjectOrIOException> results = getObjects(paths);
+
+    ImmutableList.Builder<Long> ret = ImmutableList.builder();
+    for (StorageObjectOrIOException result : results) {
+      ret.add(toFileSize(result));
+    }
+    return ret.build();
+  }
+
+  private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException)
+      throws IOException {
+    if (storageObjectOrIOException.ioException() != null) {
+      throw storageObjectOrIOException.ioException();
+    } else {
+      return storageObjectOrIOException.storageObject().getSize().longValue();
+    }
+  }
+
+  /**
+   * Opens an object in GCS.
+   *
+   * <p>Returns a SeekableByteChannel that provides access to data in the bucket.
+   *
+   * @param path the GCS filename to read from
+   * @return a SeekableByteChannel that can read the object data
+   */
+  public SeekableByteChannel open(GcsPath path)
+      throws IOException {
+    return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(),
+            path.getObject(), errorExtractor,
+            new ClientRequestHelper<StorageObject>());
+  }
+
+  /**
+   * Creates an object in GCS.
+   *
+   * <p>Returns a WritableByteChannel that can be used to write data to the
+   * object.
+   *
+   * @param path the GCS file to write to
+   * @param type the type of object, eg "text/plain".
+   * @return a Callable object that encloses the operation.
+   */
+  public WritableByteChannel create(GcsPath path,
+      String type) throws IOException {
+    GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel(
+        executorService,
+        storageClient,
+        new ClientRequestHelper<StorageObject>(),
+        path.getBucket(),
+        path.getObject(),
+        AsyncWriteChannelOptions.newBuilder().build(),
+        new ObjectWriteConditions(),
+        Collections.<String, String>emptyMap(),
+        type);
+    if (uploadBufferSizeBytes != null) {
+      channel.setUploadBufferSize(uploadBufferSizeBytes);
+    }
+    channel.initialize();
+    return channel;
+  }
+
+  /**
+   * Returns whether the GCS bucket exists and is accessible.
+   */
+  public boolean bucketAccessible(GcsPath path) throws IOException {
+    return bucketAccessible(
+        path,
+        BACKOFF_FACTORY.backoff(),
+        Sleeper.DEFAULT);
+  }
+
+  /**
+   * Returns the project number of the project which owns this bucket.
+   * If the bucket exists, it must be accessible otherwise the permissions
+   * exception will be propagated.  If the bucket does not exist, an exception
+   * will be thrown.
+   */
+  public long bucketOwner(GcsPath path) throws IOException {
+    return getBucket(
+        path,
+        BACKOFF_FACTORY.backoff(),
+        Sleeper.DEFAULT).getProjectNumber().longValue();
+  }
+
+  /**
+   * Creates a {@link Bucket} under the specified project in Cloud Storage or
+   * propagates an exception.
+   */
+  public void createBucket(String projectId, Bucket bucket) throws IOException {
+    createBucket(
+        projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+  }
+
+  /**
+   * Returns whether the GCS bucket exists. This will return false if the bucket
+   * is inaccessible due to permissions.
+   */
+  @VisibleForTesting
+  boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+    try {
+      return getBucket(path, backoff, sleeper) != null;
+    } catch (AccessDeniedException | FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+    Storage.Buckets.Get getBucket =
+        storageClient.buckets().get(path.getBucket());
+
+      try {
+        Bucket bucket = ResilientOperation.retry(
+            ResilientOperation.getGoogleRequestCallable(getBucket),
+            backoff,
+            new RetryDeterminer<IOException>() {
+              @Override
+              public boolean shouldRetry(IOException e) {
+                if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+                  return false;
+                }
+                return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+              }
+            },
+            IOException.class,
+            sleeper);
+
+        return bucket;
+      } catch (GoogleJsonResponseException e) {
+        if (errorExtractor.accessDenied(e)) {
+          throw new AccessDeniedException(path.toString(), null, e.getMessage());
+        }
+        if (errorExtractor.itemNotFound(e)) {
+          throw new FileNotFoundException(e.getMessage());
+        }
+        throw e;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(
+            String.format("Error while attempting to verify existence of bucket gs://%s",
+                path.getBucket()), e);
+     }
+  }
+
+  @VisibleForTesting
+  void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
+        throws IOException {
+    Storage.Buckets.Insert insertBucket =
+        storageClient.buckets().insert(projectId, bucket);
+    insertBucket.setPredefinedAcl("projectPrivate");
+    insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
+
+    try {
+      ResilientOperation.retry(
+        ResilientOperation.getGoogleRequestCallable(insertBucket),
+        backoff,
+        new RetryDeterminer<IOException>() {
+          @Override
+          public boolean shouldRetry(IOException e) {
+            if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) {
+              return false;
+            }
+            return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+          }
+        },
+        IOException.class,
+        sleeper);
+      return;
+    } catch (GoogleJsonResponseException e) {
+      if (errorExtractor.accessDenied(e)) {
+        throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
+      }
+      if (errorExtractor.itemAlreadyExists(e)) {
+        throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage());
+      }
+      throw e;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(
+        String.format("Error while attempting to create bucket gs://%s for rproject %s",
+                      bucket.getName(), projectId), e);
+    }
+  }
+
+  private static void executeBatches(List<BatchRequest> batches) throws IOException {
+    ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+        MoreExecutors.getExitingExecutorService(
+            new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES,
+                0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>())));
+
+    List<ListenableFuture<Void>> futures = new LinkedList<>();
+    for (final BatchRequest batch : batches) {
+      futures.add(executor.submit(new Callable<Void>() {
+        public Void call() throws IOException {
+          batch.execute();
+          return null;
+        }
+      }));
+    }
+
+    try {
+      Futures.allAsList(futures).get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while executing batch GCS request", e);
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof FileNotFoundException) {
+        throw (FileNotFoundException) e.getCause();
+      }
+      throw new IOException("Error executing batch GCS request", e);
+    } finally {
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Makes get {@link BatchRequest BatchRequests}.
+   *
+   * @param paths {@link GcsPath GcsPaths}.
+   * @param results mutable {@link List} for return values.
+   * @return {@link BatchRequest BatchRequests} to execute.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  List<BatchRequest> makeGetBatches(
+      Collection<GcsPath> paths,
+      List<StorageObjectOrIOException[]> results) throws IOException {
+    List<BatchRequest> batches = new LinkedList<>();
+    for (List<GcsPath> filesToGet :
+        Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+      BatchRequest batch = createBatchRequest();
+      for (GcsPath path : filesToGet) {
+        results.add(enqueueGetFileSize(path, batch));
+      }
+      batches.add(batch);
+    }
+    return batches;
+  }
+
+  public void copy(Iterable<String> srcFilenames,
+                   Iterable<String> destFilenames) throws
+      IOException {
+    executeBatches(makeCopyBatches(srcFilenames, destFilenames));
+  }
+
+  List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+      throws IOException {
+    List<String> srcList = Lists.newArrayList(srcFilenames);
+    List<String> destList = Lists.newArrayList(destFilenames);
+    checkArgument(
+        srcList.size() == destList.size(),
+        "Number of source files %s must equal number of destination files %s",
+        srcList.size(),
+        destList.size());
+
+    List<BatchRequest> batches = new LinkedList<>();
+    BatchRequest batch = createBatchRequest();
+    for (int i = 0; i < srcList.size(); i++) {
+      final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
+      final GcsPath destPath = GcsPath.fromUri(destList.get(i));
+      enqueueCopy(sourcePath, destPath, batch);
+      if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
+        batches.add(batch);
+        batch = createBatchRequest();
+      }
+    }
+    if (batch.size() > 0) {
+      batches.add(batch);
+    }
+    return batches;
+  }
+
+  List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException {
+    List<BatchRequest> batches = new LinkedList<>();
+    for (List<String> filesToDelete :
+        Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) {
+      BatchRequest batch = createBatchRequest();
+      for (String file : filesToDelete) {
+        enqueueDelete(GcsPath.fromUri(file), batch);
+      }
+      batches.add(batch);
+    }
+    return batches;
+  }
+
+  public void remove(Collection<String> filenames) throws IOException {
+    executeBatches(makeRemoveBatches(filenames));
+  }
+
+  private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch)
+      throws IOException {
+    final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1];
+
+    Storage.Objects.Get getRequest = storageClient.objects()
+        .get(path.getBucket(), path.getObject());
+    getRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
+      @Override
+      public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException {
+        ret[0] = StorageObjectOrIOException.create(response);
+      }
+
+      @Override
+      public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException {
+        IOException ioException;
+        if (errorExtractor.itemNotFound(e)) {
+          ioException = new FileNotFoundException(path.toString());
+        } else {
+          ioException = new IOException(String.format("Error trying to get %s: %s", path, e));
+        }
+        ret[0] = StorageObjectOrIOException.create(ioException);
+      }
+    });
+    return ret;
+  }
+
+  /**
+   * A class that holds either a {@link StorageObject} or an {@link IOException}.
+   */
+  @AutoValue
+  public abstract static class StorageObjectOrIOException {
+
+    /**
+     * Returns the {@link StorageObject}.
+     */
+    @Nullable
+    public abstract StorageObject storageObject();
+
+    /**
+     * Returns the {@link IOException}.
+     */
+    @Nullable
+    public abstract IOException ioException();
+
+    @VisibleForTesting
+    public static StorageObjectOrIOException create(StorageObject storageObject) {
+      return new AutoValue_GcsUtil_StorageObjectOrIOException(
+          checkNotNull(storageObject, "storageObject"),
+          null /* ioException */);
+    }
+
+    @VisibleForTesting
+    public static StorageObjectOrIOException create(IOException ioException) {
+      return new AutoValue_GcsUtil_StorageObjectOrIOException(
+          null /* storageObject */,
+          checkNotNull(ioException, "ioException"));
+    }
+  }
+
+  private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch)
+      throws IOException {
+    Storage.Objects.Copy copyRequest = storageClient.objects()
+        .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null);
+    copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
+      @Override
+      public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
+        LOG.debug("Successfully copied {} to {}", from, to);
+      }
+
+      @Override
+      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+        throw new IOException(
+            String.format("Error trying to copy %s to %s: %s", from, to, e));
+      }
+    });
+  }
+
+  private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException {
+    Storage.Objects.Delete deleteRequest = storageClient.objects()
+        .delete(file.getBucket(), file.getObject());
+    deleteRequest.queue(batch, new JsonBatchCallback<Void>() {
+      @Override
+      public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+        LOG.debug("Successfully deleted {}", file);
+      }
+
+      @Override
+      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+        throw new IOException(String.format("Error trying to delete %s: %s", file, e));
+      }
+    });
+  }
+
+  private BatchRequest createBatchRequest() {
+    return storageClient.batch(httpRequestInitializer);
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append('\\');
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+}