You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/18 23:19:30 UTC

[4/7] beam git commit: [BEAM-1871] Create new GCP core module package and move several GCP related classes from beam-sdks-java-core over.

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
deleted file mode 100644
index 23f0418..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.cloudresourcemanager.model.Project;
-import java.net.SocketTimeoutException;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Test case for {@link GcpProjectUtil}. */
-@RunWith(JUnit4.class)
-public class GcpProjectUtilTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private static CloudResourceManagerOptions crmOptionsWithTestCredential() {
-    CloudResourceManagerOptions pipelineOptions =
-        PipelineOptionsFactory.as(CloudResourceManagerOptions.class);
-    pipelineOptions.setGcpCredential(new TestCredential());
-    return pipelineOptions;
-  }
-
-  @Test
-  public void testGetProjectNumber() throws Exception {
-    CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential();
-    GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil();
-
-    CloudResourceManager.Projects mockProjects = Mockito.mock(
-        CloudResourceManager.Projects.class);
-    CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class);
-    projectUtil.setCrmClient(mockCrm);
-
-    CloudResourceManager.Projects.Get mockProjectsGet =
-        Mockito.mock(CloudResourceManager.Projects.Get.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-    Project project = new Project();
-    project.setProjectNumber(5L);
-
-    when(mockCrm.projects()).thenReturn(mockProjects);
-    when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet);
-    when(mockProjectsGet.execute())
-      .thenThrow(new SocketTimeoutException("SocketException"))
-      .thenReturn(project);
-
-    assertEquals(5L, projectUtil.getProjectNumber(
-        "foo", mockBackOff, new FastNanoClockAndSleeper()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
deleted file mode 100644
index a29dd45..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import java.util.ServiceLoader;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link GcsIOChannelFactoryRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class GcsIOChannelFactoryRegistrarTest {
-
-  @Test
-  public void testServiceLoader() {
-    for (IOChannelFactoryRegistrar registrar
-        : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
-      if (registrar instanceof GcsIOChannelFactoryRegistrar) {
-        return;
-      }
-    }
-    fail("Expected to find " + GcsIOChannelFactoryRegistrar.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
deleted file mode 100644
index 7248b38..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link GcsIOChannelFactoryTest}. */
-@RunWith(JUnit4.class)
-public class GcsIOChannelFactoryTest {
-  private GcsIOChannelFactory factory;
-
-  @Before
-  public void setUp() {
-    factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class));
-  }
-
-  @Test
-  public void testResolve() throws Exception {
-    assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
deleted file mode 100644
index dc36319..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link GcsPathValidator}. */
-@RunWith(JUnit4.class)
-public class GcsPathValidatorTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Mock private GcsUtil mockGcsUtil;
-  private GcsPathValidator validator;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
-    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-    options.setGcsUtil(mockGcsUtil);
-    validator = GcsPathValidator.fromOptions(options);
-  }
-
-  @Test
-  public void testValidFilePattern() {
-    validator.validateInputFilePatternSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidFilePattern() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateInputFilePatternSupported("/local/path");
-  }
-
-  @Test
-  public void testWhenBucketDoesNotExist() throws Exception {
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Could not find file gs://non-existent-bucket/location");
-    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
-  }
-
-  @Test
-  public void testValidOutputPrefix() {
-    validator.validateOutputFilePrefixSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidOutputPrefix() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateOutputFilePrefixSupported("/local/path");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
deleted file mode 100644
index 03668ce..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpStatusCodes;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.LowLevelHttpRequest;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.json.Json;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.HttpTesting;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.client.util.BackOff;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Bucket;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
-import com.google.cloud.hadoop.util.ClientRequestHelper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigInteger;
-import java.net.SocketTimeoutException;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.AccessDeniedException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Test case for {@link GcsUtil}. */
-@RunWith(JUnit4.class)
-public class GcsUtilTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testGlobTranslation() {
-    assertEquals("foo", GcsUtil.globToRegexp("foo"));
-    assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o"));
-    assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?"));
-    assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*"));
-  }
-
-  private static GcsOptions gcsOptionsWithTestCredential() {
-    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
-    pipelineOptions.setGcpCredential(new TestCredential());
-    return pipelineOptions;
-  }
-
-  @Test
-  public void testCreationWithDefaultOptions() {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    assertNotNull(pipelineOptions.getGcpCredential());
-  }
-
-  @Test
-  public void testUploadBufferSizeDefault() {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil util = pipelineOptions.getGcsUtil();
-    assertNull(util.getUploadBufferSizeBytes());
-  }
-
-  @Test
-  public void testUploadBufferSizeUserSpecified() {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    pipelineOptions.setGcsUploadBufferSizeBytes(12345);
-    GcsUtil util = pipelineOptions.getGcsUtil();
-    assertEquals((Integer) 12345, util.getUploadBufferSizeBytes());
-  }
-
-  @Test
-  public void testCreationWithExecutorServiceProvided() {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    pipelineOptions.setExecutorService(Executors.newCachedThreadPool());
-    assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService);
-  }
-
-  @Test
-  public void testCreationWithGcsUtilProvided() {
-    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
-    GcsUtil gcsUtil = Mockito.mock(GcsUtil.class);
-    pipelineOptions.setGcsUtil(gcsUtil);
-    assertSame(gcsUtil, pipelineOptions.getGcsUtil());
-  }
-
-  @Test
-  public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception {
-    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
-    ExecutorService executorService = pipelineOptions.getExecutorService();
-
-    int numThreads = 100;
-    final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      final int currentLatch = i;
-      countDownLatches[i] = new CountDownLatch(1);
-      executorService.execute(
-          new Runnable() {
-            @Override
-            public void run() {
-              // Wait for latch N and then release latch N - 1
-              try {
-                countDownLatches[currentLatch].await();
-                if (currentLatch > 0) {
-                  countDownLatches[currentLatch - 1].countDown();
-                }
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
-              }
-            }
-          });
-    }
-
-    // Release the last latch starting the chain reaction.
-    countDownLatches[countDownLatches.length - 1].countDown();
-    executorService.shutdown();
-    assertTrue("Expected tasks to complete",
-        executorService.awaitTermination(10, TimeUnit.SECONDS));
-  }
-
-  @Test
-  public void testGlobExpansion() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
-    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-    Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
-
-    Objects modelObjects = new Objects();
-    List<StorageObject> items = new ArrayList<>();
-    // A directory
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
-
-    // Files within the directory
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name"));
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name"));
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name"));
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile"));
-
-    modelObjects.setItems(items);
-
-    when(mockStorage.objects()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")).thenReturn(
-        mockStorageGet);
-    when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
-    when(mockStorageGet.execute()).thenReturn(
-        new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
-    when(mockStorageList.execute()).thenReturn(modelObjects);
-
-    // Test a single file.
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
-      List<GcsPath> expectedFiles =
-          ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
-
-      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
-    }
-
-    // Test patterns.
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
-      List<GcsPath> expectedFiles = ImmutableList.of(
-          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
-      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
-    }
-
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
-      List<GcsPath> expectedFiles = ImmutableList.of(
-          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
-      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
-    }
-
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
-      List<GcsPath> expectedFiles = ImmutableList.of(
-          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
-      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
-    }
-
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
-      List<GcsPath> expectedFiles = ImmutableList.of(
-          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
-          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
-      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
-    }
-  }
-
-  // Patterns that contain recursive wildcards ('**') are not supported.
-  @Test
-  public void testRecursiveGlobExpansionFails() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-    GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**");
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Unsupported wildcard usage");
-    gcsUtil.expand(pattern);
-  }
-
-  // GCSUtil.expand() should fail when matching a single object when that object does not exist.
-  // We should return the empty result since GCS get object is strongly consistent.
-  @Test
-  public void testNonExistentObjectReturnsEmptyResult() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
-    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
-    GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile");
-    GoogleJsonResponseException expectedException =
-        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
-            "It don't exist", "Nothing here to see");
-
-    when(mockStorage.objects()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
-        mockStorageGet);
-    when(mockStorageGet.execute()).thenThrow(expectedException);
-
-    assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern));
-  }
-
-  // GCSUtil.expand() should fail for other errors such as access denied.
-  @Test
-  public void testAccessDeniedObjectThrowsIOException() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
-    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
-    GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile");
-    GoogleJsonResponseException expectedException =
-        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
-            "Waves hand mysteriously", "These aren't the buckets you're looking for");
-
-    when(mockStorage.objects()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
-        mockStorageGet);
-    when(mockStorageGet.execute()).thenThrow(expectedException);
-
-    thrown.expect(IOException.class);
-    thrown.expectMessage("Unable to get the file object for path");
-    gcsUtil.expand(pattern);
-  }
-
-  @Test
-  public void testFileSizeNonBatch() throws Exception {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
-    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
-    when(mockStorage.objects()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
-    when(mockStorageGet.execute()).thenReturn(
-            new StorageObject().setSize(BigInteger.valueOf(1000)));
-
-    assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")));
-  }
-
-  @Test
-  public void testFileSizeWhenFileNotFoundNonBatch() throws Exception {
-    MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse();
-    notFoundResponse.setContent("");
-    notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
-
-    MockHttpTransport mockTransport =
-            new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
-
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
-
-    thrown.expect(FileNotFoundException.class);
-    gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
-  }
-
-  @Test
-  public void testRetryFileSizeNonBatch() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
-    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
-
-    when(mockStorage.objects()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
-    when(mockStorageGet.execute())
-            .thenThrow(new SocketTimeoutException("SocketException"))
-            .thenThrow(new SocketTimeoutException("SocketException"))
-            .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
-
-    assertEquals(1000,
-        gcsUtil.getObject(
-            GcsPath.fromComponents("testbucket", "testobject"),
-            mockBackOff,
-            new FastNanoClockAndSleeper()).getSize().longValue());
-    assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception {
-    JsonFactory jsonFactory = new JacksonFactory();
-
-    String contentBoundary = "batch_foobarbaz";
-    String contentBoundaryLine = "--" + contentBoundary;
-    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
-
-    GenericJson error = new GenericJson()
-        .set("error", new GenericJson().set("code", 404));
-    error.setFactory(jsonFactory);
-
-    String content = contentBoundaryLine + "\n"
-        + "Content-Type: application/http\n"
-        + "\n"
-        + "HTTP/1.1 404 Not Found\n"
-        + "Content-Length: -1\n"
-        + "\n"
-        + error.toString()
-        + "\n"
-        + "\n"
-        + endOfContentBoundaryLine
-        + "\n";
-    thrown.expect(FileNotFoundException.class);
-    MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse()
-        .setContentType("multipart/mixed; boundary=" + contentBoundary)
-        .setContent(content)
-        .setStatusCode(HttpStatusCodes.STATUS_CODE_OK);
-
-    MockHttpTransport mockTransport =
-        new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
-
-    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
-    gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
-    gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
-  }
-
-  @Test
-  public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception {
-    JsonFactory jsonFactory = new JacksonFactory();
-
-    String contentBoundary = "batch_foobarbaz";
-    String contentBoundaryLine = "--" + contentBoundary;
-    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
-
-    GenericJson error = new GenericJson()
-        .set("error", new GenericJson().set("code", 404));
-    error.setFactory(jsonFactory);
-
-    String content = contentBoundaryLine + "\n"
-        + "Content-Type: application/http\n"
-        + "\n"
-        + "HTTP/1.1 404 Not Found\n"
-        + "Content-Length: -1\n"
-        + "\n"
-        + error.toString()
-        + "\n"
-        + "\n"
-        + endOfContentBoundaryLine
-        + "\n";
-    thrown.expect(FileNotFoundException.class);
-
-    final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class);
-    when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary);
-
-    // 429: Too many requests, then 200: OK.
-    when(mockResponse.getStatusCode()).thenReturn(429, 200);
-    when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content));
-
-    // A mock transport that lets us mock the API responses.
-    MockHttpTransport mockTransport =
-        new MockHttpTransport.Builder()
-            .setLowLevelHttpRequest(
-                new MockLowLevelHttpRequest() {
-                  @Override
-                  public LowLevelHttpResponse execute() throws IOException {
-                    return mockResponse;
-                  }
-                })
-            .build();
-
-    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
-        gcsUtil.setStorageClient(
-        new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
-    gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
-  }
-
-  @Test
-  public void testCreateBucket() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
-    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.insert(
-           any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
-    when(mockStorageInsert.execute())
-        .thenThrow(new SocketTimeoutException("SocketException"))
-        .thenReturn(new Bucket());
-
-    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
-  }
-
-  @Test
-  public void testCreateBucketAccessErrors() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
-    Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-    GoogleJsonResponseException expectedException =
-        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
-            "Waves hand mysteriously", "These aren't the buckets you're looking for");
-
-    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.insert(
-           any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
-    when(mockStorageInsert.execute())
-        .thenThrow(expectedException);
-
-    thrown.expect(AccessDeniedException.class);
-
-    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
-  }
-
-  @Test
-  public void testBucketAccessible() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
-    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
-    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
-    when(mockStorageGet.execute())
-        .thenThrow(new SocketTimeoutException("SocketException"))
-        .thenReturn(new Bucket());
-
-    assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
-        mockBackOff, new FastNanoClockAndSleeper()));
-  }
-
-  @Test
-  public void testBucketDoesNotExistBecauseOfAccessError() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
-    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-    GoogleJsonResponseException expectedException =
-        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
-            "Waves hand mysteriously", "These aren't the buckets you're looking for");
-
-    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
-    when(mockStorageGet.execute())
-        .thenThrow(expectedException);
-
-    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
-        mockBackOff, new FastNanoClockAndSleeper()));
-  }
-
-  @Test
-  public void testBucketDoesNotExist() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
-    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
-    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
-    when(mockStorageGet.execute())
-        .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
-            "It don't exist", "Nothing here to see"));
-
-    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
-        mockBackOff, new FastNanoClockAndSleeper()));
-  }
-
-  @Test
-  public void testGetBucket() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
-    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
-    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
-    when(mockStorageGet.execute())
-        .thenThrow(new SocketTimeoutException("SocketException"))
-        .thenReturn(new Bucket());
-
-    assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
-        mockBackOff, new FastNanoClockAndSleeper()));
-  }
-
-  @Test
-  public void testGetBucketNotExists() throws IOException {
-    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
-    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
-    Storage mockStorage = Mockito.mock(Storage.class);
-    gcsUtil.setStorageClient(mockStorage);
-
-    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
-    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
-    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
-    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
-    when(mockStorageGet.execute())
-        .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
-            "It don't exist", "Nothing here to see"));
-
-    thrown.expect(FileNotFoundException.class);
-    thrown.expectMessage("It don't exist");
-    gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
-                      mockBackOff, new FastNanoClockAndSleeper());
-  }
-
-  @Test
-  public void testGCSChannelCloseIdempotent() throws IOException {
-    SeekableByteChannel channel =
-        new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null,
-        new ClientRequestHelper<StorageObject>());
-    channel.close();
-    channel.close();
-  }
-
-  /**
-   * Builds a fake GoogleJsonResponseException for testing API error handling.
-   */
-  private static GoogleJsonResponseException googleJsonResponseException(
-      final int status, final String reason, final String message) throws IOException {
-    final JsonFactory jsonFactory = new JacksonFactory();
-    HttpTransport transport = new MockHttpTransport() {
-      @Override
-      public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
-        ErrorInfo errorInfo = new ErrorInfo();
-        errorInfo.setReason(reason);
-        errorInfo.setMessage(message);
-        errorInfo.setFactory(jsonFactory);
-        GenericJson error = new GenericJson();
-        error.set("code", status);
-        error.set("errors", Arrays.asList(errorInfo));
-        error.setFactory(jsonFactory);
-        GenericJson errorResponse = new GenericJson();
-        errorResponse.set("error", error);
-        errorResponse.setFactory(jsonFactory);
-        return new MockLowLevelHttpRequest().setResponse(
-            new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
-            .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
-        }
-    };
-    HttpRequest request =
-        transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
-    request.setThrowExceptionOnExecuteError(false);
-    HttpResponse response = request.execute();
-    return GoogleJsonResponseException.from(jsonFactory, response);
-  }
-
-  private static List<String> makeStrings(String s, int n) {
-    ImmutableList.Builder<String> ret = ImmutableList.builder();
-    for (int i = 0; i < n; ++i) {
-      ret.add(String.format("gs://bucket/%s%d", s, i));
-    }
-    return ret.build();
-  }
-
-  private static List<GcsPath> makeGcsPaths(String s, int n) {
-    ImmutableList.Builder<GcsPath> ret = ImmutableList.builder();
-    for (int i = 0; i < n; ++i) {
-      ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i)));
-    }
-    return ret.build();
-  }
-
-  private static int sumBatchSizes(List<BatchRequest> batches) {
-    int ret = 0;
-    for (BatchRequest b : batches) {
-      ret += b.size();
-      assertThat(b.size(), greaterThan(0));
-    }
-    return ret;
-  }
-
-  @Test
-  public void testMakeCopyBatches() throws IOException {
-    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
-    // Small number of files fits in 1 batch
-    List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3));
-    assertThat(batches.size(), equalTo(1));
-    assertThat(sumBatchSizes(batches), equalTo(3));
-
-    // 1 batch of files fits in 1 batch
-    batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100));
-    assertThat(batches.size(), equalTo(1));
-    assertThat(sumBatchSizes(batches), equalTo(100));
-
-    // A little more than 5 batches of files fits in 6 batches
-    batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501));
-    assertThat(batches.size(), equalTo(6));
-    assertThat(sumBatchSizes(batches), equalTo(501));
-  }
-
-  @Test
-  public void testInvalidCopyBatches() throws IOException {
-    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Number of source files 3");
-
-    gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1));
-  }
-
-  @Test
-  public void testMakeRemoveBatches() throws IOException {
-    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
-    // Small number of files fits in 1 batch
-    List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3));
-    assertThat(batches.size(), equalTo(1));
-    assertThat(sumBatchSizes(batches), equalTo(3));
-
-    // 1 batch of files fits in 1 batch
-    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
-    assertThat(batches.size(), equalTo(1));
-    assertThat(sumBatchSizes(batches), equalTo(100));
-
-    // A little more than 5 batches of files fits in 6 batches
-    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
-    assertThat(batches.size(), equalTo(6));
-    assertThat(sumBatchSizes(batches), equalTo(501));
-  }
-
-  @Test
-  public void testMakeGetBatches() throws IOException {
-    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
-    // Small number of files fits in 1 batch
-    List<StorageObjectOrIOException[]> results = Lists.newArrayList();
-    List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results);
-    assertThat(batches.size(), equalTo(1));
-    assertThat(sumBatchSizes(batches), equalTo(3));
-    assertEquals(3, results.size());
-
-    // 1 batch of files fits in 1 batch
-    results = Lists.newArrayList();
-    batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results);
-    assertThat(batches.size(), equalTo(1));
-    assertThat(sumBatchSizes(batches), equalTo(100));
-    assertEquals(100, results.size());
-
-    // A little more than 5 batches of files fits in 6 batches
-    results = Lists.newArrayList();
-    batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results);
-    assertThat(batches.size(), equalTo(6));
-    assertThat(sumBatchSizes(batches), equalTo(501));
-    assertEquals(501, results.size());
-  }
-
-  /**
-   * A helper to wrap a {@link GenericJson} object in a content stream.
-   */
-  private static InputStream toStream(String content) throws IOException {
-    return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
deleted file mode 100644
index 8e7878c..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link IntervalBoundedExponentialBackOff}. */
-@RunWith(JUnit4.class)
-public class IntervalBoundedExponentialBackOffTest {
-  @Rule public ExpectedException exception = ExpectedException.none();
-
-
-  @Test
-  public void testUsingInvalidInitialInterval() throws Exception {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Initial interval must be greater than zero.");
-    new IntervalBoundedExponentialBackOff(1000L, 0L);
-  }
-
-  @Test
-  public void testUsingInvalidMaximumInterval() throws Exception {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Maximum interval must be greater than zero.");
-    new IntervalBoundedExponentialBackOff(-1L, 10L);
-  }
-
-  @Test
-  public void testThatcertainNumberOfAttemptsReachesMaxInterval() throws Exception {
-    IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-  }
-
-  @Test
-  public void testThatResettingAllowsReuse() throws Exception {
-    IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-  }
-
-  @Test
-  public void testAtMaxInterval() throws Exception {
-    IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500);
-    assertFalse(backOff.atMaxInterval());
-    backOff.nextBackOffMillis();
-    assertFalse(backOff.atMaxInterval());
-    backOff.nextBackOffMillis();
-    assertTrue(backOff.atMaxInterval());
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
-        lessThanOrEqualTo(1500L)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
deleted file mode 100644
index 71554b5..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpResponseException;
-import com.google.api.client.http.HttpResponseInterceptor;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.LowLevelHttpRequest;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.Storage.Objects.Get;
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.security.PrivateKey;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicLong;
-import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Tests for RetryHttpRequestInitializer.
- */
-@RunWith(JUnit4.class)
-public class RetryHttpRequestInitializerTest {
-
-  @Mock private Credential mockCredential;
-  @Mock private PrivateKey mockPrivateKey;
-  @Mock private LowLevelHttpRequest mockLowLevelRequest;
-  @Mock private LowLevelHttpResponse mockLowLevelResponse;
-  @Mock private HttpResponseInterceptor mockHttpResponseInterceptor;
-
-  private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
-  private Storage storage;
-
-  // Used to test retrying a request more than the default 10 times.
-  static class MockNanoClock implements NanoClock {
-    private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543,
-        12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420};
-    private int i = 0;
-
-    @Override
-    public long nanoTime() {
-      return timesMs[i++ / 2] * 1000000;
-    }
-  }
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    HttpTransport lowLevelTransport = new HttpTransport() {
-      @Override
-      protected LowLevelHttpRequest buildRequest(String method, String url)
-          throws IOException {
-        return mockLowLevelRequest;
-      }
-    };
-
-    // Retry initializer will pass through to credential, since we can have
-    // only a single HttpRequestInitializer, and we use multiple Credential
-    // types in the SDK, not all of which allow for retry configuration.
-    RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer(
-        mockCredential, new MockNanoClock(), new Sleeper() {
-          @Override
-          public void sleep(long millis) throws InterruptedException {}
-        }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor);
-    storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer)
-        .setApplicationName("test").build();
-  }
-
-  @After
-  public void tearDown() {
-    verifyNoMoreInteractions(mockPrivateKey);
-    verifyNoMoreInteractions(mockLowLevelRequest);
-    verifyNoMoreInteractions(mockCredential);
-    verifyNoMoreInteractions(mockHttpResponseInterceptor);
-  }
-
-  @Test
-  public void testBasicOperation() throws IOException {
-    when(mockLowLevelRequest.execute())
-        .thenReturn(mockLowLevelResponse);
-    when(mockLowLevelResponse.getStatusCode())
-        .thenReturn(200);
-
-    Storage.Buckets.Get result = storage.buckets().get("test");
-    HttpResponse response = result.executeUnparsed();
-    assertNotNull(response);
-
-    verify(mockCredential).initialize(any(HttpRequest.class));
-    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
-    verify(mockLowLevelRequest, atLeastOnce())
-        .addHeader(anyString(), anyString());
-    verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
-    verify(mockLowLevelRequest).execute();
-    verify(mockLowLevelResponse).getStatusCode();
-  }
-
-  /**
-   * Tests that a non-retriable error is not retried.
-   */
-  @Test
-  public void testErrorCodeForbidden() throws IOException {
-    when(mockLowLevelRequest.execute())
-        .thenReturn(mockLowLevelResponse);
-    when(mockLowLevelResponse.getStatusCode())
-        .thenReturn(403)  // Non-retryable error.
-        .thenReturn(200); // Shouldn't happen.
-
-    try {
-      Storage.Buckets.Get result = storage.buckets().get("test");
-      HttpResponse response = result.executeUnparsed();
-      assertNotNull(response);
-    } catch (HttpResponseException e) {
-      Assert.assertThat(e.getMessage(), Matchers.containsString("403"));
-    }
-
-    verify(mockCredential).initialize(any(HttpRequest.class));
-    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
-    verify(mockLowLevelRequest, atLeastOnce())
-        .addHeader(anyString(), anyString());
-    verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
-    verify(mockLowLevelRequest).execute();
-    verify(mockLowLevelResponse).getStatusCode();
-  }
-
-  /**
-   * Tests that a retriable error is retried.
-   */
-  @Test
-  public void testRetryableError() throws IOException {
-    when(mockLowLevelRequest.execute())
-        .thenReturn(mockLowLevelResponse)
-        .thenReturn(mockLowLevelResponse)
-        .thenReturn(mockLowLevelResponse);
-    when(mockLowLevelResponse.getStatusCode())
-        .thenReturn(503)  // Retryable
-        .thenReturn(429)  // We also retry on 429 Too Many Requests.
-        .thenReturn(200);
-
-    Storage.Buckets.Get result = storage.buckets().get("test");
-    HttpResponse response = result.executeUnparsed();
-    assertNotNull(response);
-
-    verify(mockCredential).initialize(any(HttpRequest.class));
-    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
-    verify(mockLowLevelRequest, atLeastOnce())
-        .addHeader(anyString(), anyString());
-    verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt());
-    verify(mockLowLevelRequest, times(3)).execute();
-    verify(mockLowLevelResponse, times(3)).getStatusCode();
-  }
-
-  /**
-   * Tests that an IOException is retried.
-   */
-  @Test
-  public void testThrowIOException() throws IOException {
-    when(mockLowLevelRequest.execute())
-        .thenThrow(new IOException("Fake Error"))
-        .thenReturn(mockLowLevelResponse);
-    when(mockLowLevelResponse.getStatusCode())
-        .thenReturn(200);
-
-    Storage.Buckets.Get result = storage.buckets().get("test");
-    HttpResponse response = result.executeUnparsed();
-    assertNotNull(response);
-
-    verify(mockCredential).initialize(any(HttpRequest.class));
-    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
-    verify(mockLowLevelRequest, atLeastOnce())
-        .addHeader(anyString(), anyString());
-    verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt());
-    verify(mockLowLevelRequest, times(2)).execute();
-    verify(mockLowLevelResponse).getStatusCode();
-  }
-
-  /**
-   * Tests that a retryable error is retried enough times.
-   */
-  @Test
-  public void testRetryableErrorRetryEnoughTimes() throws IOException {
-    when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse);
-    final int retries = 10;
-    when(mockLowLevelResponse.getStatusCode()).thenAnswer(new Answer<Integer>(){
-      int n = 0;
-      @Override
-      public Integer answer(InvocationOnMock invocation) {
-        return (n++ < retries - 1) ? 503 : 200;
-      }});
-
-    Storage.Buckets.Get result = storage.buckets().get("test");
-    HttpResponse response = result.executeUnparsed();
-    assertNotNull(response);
-
-    verify(mockCredential).initialize(any(HttpRequest.class));
-    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
-    verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(),
-        anyString());
-    verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt());
-    verify(mockLowLevelRequest, times(retries)).execute();
-    verify(mockLowLevelResponse, times(retries)).getStatusCode();
-  }
-
-  /**
-   * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO exception handler
-   * is invoked.
-   */
-  @Test
-  public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception {
-    // Counts the number of calls to execute the HTTP request.
-    final AtomicLong executeCount = new AtomicLong();
-
-    // 10 is a private internal constant in the Google API Client library. See
-    // com.google.api.client.http.HttpRequest#setNumberOfRetries
-    // TODO: update this test once the private internal constant is public.
-    final int defaultNumberOfRetries = 10;
-
-    // A mock HTTP request that always throws SocketTimeoutException.
-    MockHttpTransport transport =
-        new MockHttpTransport.Builder().setLowLevelHttpRequest(new MockLowLevelHttpRequest() {
-          @Override
-          public LowLevelHttpResponse execute() throws IOException {
-            executeCount.incrementAndGet();
-            throw new SocketTimeoutException("Fake forced timeout exception");
-          }
-        }).build();
-
-    // A sample HTTP request to Google Cloud Storage that uses both default Transport and default
-    // RetryHttpInitializer.
-    Storage storage = new Storage.Builder(
-        transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build();
-
-    Get getRequest = storage.objects().get("gs://fake", "file");
-
-    try {
-      getRequest.execute();
-      fail();
-    } catch (Throwable e) {
-      assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
-      assertEquals(1 + defaultNumberOfRetries, executeCount.get());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/pom.xml b/sdks/java/extensions/gcp-core/pom.xml
new file mode 100644
index 0000000..d566f94
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-extensions-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core</name>
+  <description>Common components used to support multiple
+  Google Cloud Platform specific maven modules.</description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludedGroups>
+            org.apache.beam.sdk.testing.NeedsRunner
+          </excludedGroups>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>true</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client-jackson2</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.oauth-client</groupId>
+      <artifactId>google-oauth-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auth</groupId>
+      <artifactId>google-auth-library-oauth2-http</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>gcsio</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-cloudresourcemanager</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-pubsub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-storage</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auth</groupId>
+      <artifactId>google-auth-library-credentials</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
new file mode 100644
index 0000000..7672cd7
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+/**
+ * Properties needed when using Google BigQuery with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google BigQuery. See "
+    + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
+public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
+    PipelineOptions, StreamingOptions {
+  @Description("Temporary dataset for BigQuery table operations. "
+      + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
+  @Default.String("bigquery.googleapis.com/cloud_dataflow")
+  String getTempDatasetId();
+  void setTempDatasetId(String value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
new file mode 100644
index 0000000..13fdaf3
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.util.GcpProjectUtil;
+
+/**
+ * Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google CloudResourceManager. See "
+    + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
+public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
+    PipelineOptions, StreamingOptions {
+  /**
+   * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
+   */
+  @JsonIgnore
+  @Description("The GcpProjectUtil instance that should be used to communicate"
+               + " with Google Cloud Resource Manager.")
+  @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
+  @Hidden
+  GcpProjectUtil getGcpProjectUtil();
+  void setGcpProjectUtil(GcpProjectUtil value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
new file mode 100644
index 0000000..d01406f
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.auth.Credentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.CredentialFactory;
+import org.apache.beam.sdk.util.DefaultBucket;
+import org.apache.beam.sdk.util.GcpCredentialFactory;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Options used to configure Google Cloud Platform specific options such as the project
+ * and credentials.
+ *
+ * <p>These options defer to the
+ * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
+ * application default credentials</a> for authentication. See the
+ * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for
+ * alternative mechanisms for creating credentials.
+ */
+@Description("Options used to configure Google Cloud Platform project and credentials.")
+public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
+  /**
+   * Project id to use when launching jobs.
+   */
+  @Description("Project id. Required when using Google Cloud Platform services. "
+      + "See https://cloud.google.com/storage/docs/projects for further details.")
+  @Default.InstanceFactory(DefaultProjectFactory.class)
+  String getProject();
+  void setProject(String value);
+
+  /**
+   * GCP <a href="https://developers.google.com/compute/docs/zones"
+   * >availability zone</a> for operations.
+   *
+   * <p>Default is set on a per-service basis.
+   */
+  @Description("GCP availability zone for running GCP operations. "
+      + "Default is up to the individual service.")
+  String getZone();
+  void setZone(String value);
+
+  /**
+   * The class of the credential factory that should be created and used to create
+   * credentials. If gcpCredential has not been set explicitly, an instance of this class will
+   * be constructed and used as a credential factory.
+   */
+  @Description("The class of the credential factory that should be created and used to create "
+      + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
+      + "be constructed and used as a credential factory.")
+  @Default.Class(GcpCredentialFactory.class)
+  Class<? extends CredentialFactory> getCredentialFactoryClass();
+  void setCredentialFactoryClass(
+      Class<? extends CredentialFactory> credentialFactoryClass);
+
+  /**
+   * The credential instance that should be used to authenticate against GCP services.
+   * If no credential has been set explicitly, the default is to use the instance factory
+   * that constructs a credential based upon the currently set credentialFactoryClass.
+   */
+  @JsonIgnore
+  @Description("The credential instance that should be used to authenticate against GCP services. "
+      + "If no credential has been set explicitly, the default is to use the instance factory "
+      + "that constructs a credential based upon the currently set credentialFactoryClass.")
+  @Default.InstanceFactory(GcpUserCredentialsFactory.class)
+  Credentials getGcpCredential();
+  void setGcpCredential(Credentials value);
+
+  /**
+   * Attempts to infer the default project based upon the environment this application
+   * is executing within. Currently this only supports getting the default project from gcloud.
+   */
+  class DefaultProjectFactory implements DefaultValueFactory<String> {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
+
+    @Override
+    public String create(PipelineOptions options) {
+      try {
+        File configFile;
+        if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
+          configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
+        } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
+          configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
+        } else {
+          // New versions of gcloud use this file
+          configFile = new File(
+              System.getProperty("user.home"),
+              ".config/gcloud/configurations/config_default");
+          if (!configFile.exists()) {
+            // Old versions of gcloud use this file
+            configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
+          }
+        }
+        String section = null;
+        Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
+        Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
+        for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
+          line = line.trim();
+          if (line.isEmpty() || line.startsWith(";")) {
+            continue;
+          }
+          Matcher matcher = sectionPattern.matcher(line);
+          if (matcher.matches()) {
+            section = matcher.group(1);
+          } else if (section == null || section.equals("core")) {
+            matcher = projectPattern.matcher(line);
+            if (matcher.matches()) {
+              String project = matcher.group(1).trim();
+              LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
+                  + "project, please cancel this Pipeline and specify the command-line "
+                  + "argument --project.", project);
+              return project;
+            }
+          }
+        }
+      } catch (IOException expected) {
+        LOG.debug("Failed to find default project.", expected);
+      }
+      // return null if can't determine
+      return null;
+    }
+
+    /**
+     * Returns true if running on the Windows OS.
+     */
+    private static boolean isWindows() {
+      return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
+    }
+
+    /**
+     * Used to mock out getting environment variables.
+     */
+    @VisibleForTesting
+    Map<String, String> getEnvironment() {
+        return System.getenv();
+    }
+  }
+
+  /**
+   * Attempts to load the GCP credentials. See
+   * {@link CredentialFactory#getCredential()} for more details.
+   */
+  class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> {
+    @Override
+    public Credentials create(PipelineOptions options) {
+      GcpOptions gcpOptions = options.as(GcpOptions.class);
+      try {
+        CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
+            .fromClass(gcpOptions.getCredentialFactoryClass())
+            .fromFactoryMethod("fromOptions")
+            .withArg(PipelineOptions.class, options)
+            .build();
+        return factory.getCredential();
+      } catch (IOException | GeneralSecurityException e) {
+        throw new RuntimeException("Unable to obtain credential", e);
+      }
+    }
+  }
+
+  /**
+   * A GCS path for storing temporary files in GCP.
+   *
+   * <p>Its default to {@link PipelineOptions#getTempLocation}.
+   */
+  @Description("A GCS path for storing temporary files in GCP.")
+  @Default.InstanceFactory(GcpTempLocationFactory.class)
+  @Nullable String getGcpTempLocation();
+  void setGcpTempLocation(String value);
+
+  /**
+   * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
+   */
+  class GcpTempLocationFactory implements DefaultValueFactory<String> {
+
+    @Override
+    @Nullable
+    public String create(PipelineOptions options) {
+      String tempLocation = options.getTempLocation();
+      if (isNullOrEmpty(tempLocation)) {
+        tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
+        options.setTempLocation(tempLocation);
+      } else {
+        try {
+          PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+          validator.validateOutputFilePrefixSupported(tempLocation);
+        } catch (Exception e) {
+          throw new IllegalArgumentException(String.format(
+              "Error constructing default value for gcpTempLocation: tempLocation is not"
+              + " a valid GCS path, %s. ", tempLocation), e);
+        }
+      }
+      return tempLocation;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..00be440
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.options;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A registrar containing the default GCP options.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>builder()
+        .add(BigQueryOptions.class)
+        .add(GcpOptions.class)
+        .add(GcsOptions.class)
+        .add(GoogleApiDebugOptions.class)
+        .add(PubsubOptions.class)
+        .build();
+  }
+}