You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/18 23:19:30 UTC
[4/7] beam git commit: [BEAM-1871] Create new GCP core module package
and move several GCP related classes from beam-sdks-java-core over.
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
deleted file mode 100644
index 23f0418..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.cloudresourcemanager.model.Project;
-import java.net.SocketTimeoutException;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Test case for {@link GcpProjectUtil}. */
-@RunWith(JUnit4.class)
-public class GcpProjectUtilTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- private static CloudResourceManagerOptions crmOptionsWithTestCredential() {
- CloudResourceManagerOptions pipelineOptions =
- PipelineOptionsFactory.as(CloudResourceManagerOptions.class);
- pipelineOptions.setGcpCredential(new TestCredential());
- return pipelineOptions;
- }
-
- @Test
- public void testGetProjectNumber() throws Exception {
- CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential();
- GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil();
-
- CloudResourceManager.Projects mockProjects = Mockito.mock(
- CloudResourceManager.Projects.class);
- CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class);
- projectUtil.setCrmClient(mockCrm);
-
- CloudResourceManager.Projects.Get mockProjectsGet =
- Mockito.mock(CloudResourceManager.Projects.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
- Project project = new Project();
- project.setProjectNumber(5L);
-
- when(mockCrm.projects()).thenReturn(mockProjects);
- when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet);
- when(mockProjectsGet.execute())
- .thenThrow(new SocketTimeoutException("SocketException"))
- .thenReturn(project);
-
- assertEquals(5L, projectUtil.getProjectNumber(
- "foo", mockBackOff, new FastNanoClockAndSleeper()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
deleted file mode 100644
index a29dd45..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import java.util.ServiceLoader;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link GcsIOChannelFactoryRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class GcsIOChannelFactoryRegistrarTest {
-
- @Test
- public void testServiceLoader() {
- for (IOChannelFactoryRegistrar registrar
- : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
- if (registrar instanceof GcsIOChannelFactoryRegistrar) {
- return;
- }
- }
- fail("Expected to find " + GcsIOChannelFactoryRegistrar.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
deleted file mode 100644
index 7248b38..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link GcsIOChannelFactoryTest}. */
-@RunWith(JUnit4.class)
-public class GcsIOChannelFactoryTest {
- private GcsIOChannelFactory factory;
-
- @Before
- public void setUp() {
- factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class));
- }
-
- @Test
- public void testResolve() throws Exception {
- assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object"));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
deleted file mode 100644
index dc36319..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link GcsPathValidator}. */
-@RunWith(JUnit4.class)
-public class GcsPathValidatorTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Mock private GcsUtil mockGcsUtil;
- private GcsPathValidator validator;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
- GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
- options.setGcpCredential(new TestCredential());
- options.setGcsUtil(mockGcsUtil);
- validator = GcsPathValidator.fromOptions(options);
- }
-
- @Test
- public void testValidFilePattern() {
- validator.validateInputFilePatternSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidFilePattern() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Expected a valid 'gs://' path but was given '/local/path'");
- validator.validateInputFilePatternSupported("/local/path");
- }
-
- @Test
- public void testWhenBucketDoesNotExist() throws Exception {
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Could not find file gs://non-existent-bucket/location");
- validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
- }
-
- @Test
- public void testValidOutputPrefix() {
- validator.validateOutputFilePrefixSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidOutputPrefix() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Expected a valid 'gs://' path but was given '/local/path'");
- validator.validateOutputFilePrefixSupported("/local/path");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
deleted file mode 100644
index 03668ce..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpStatusCodes;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.LowLevelHttpRequest;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.json.Json;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.HttpTesting;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.client.util.BackOff;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Bucket;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
-import com.google.cloud.hadoop.util.ClientRequestHelper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigInteger;
-import java.net.SocketTimeoutException;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.AccessDeniedException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Test case for {@link GcsUtil}. */
-@RunWith(JUnit4.class)
-public class GcsUtilTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testGlobTranslation() {
- assertEquals("foo", GcsUtil.globToRegexp("foo"));
- assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o"));
- assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?"));
- assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*"));
- }
-
- private static GcsOptions gcsOptionsWithTestCredential() {
- GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
- pipelineOptions.setGcpCredential(new TestCredential());
- return pipelineOptions;
- }
-
- @Test
- public void testCreationWithDefaultOptions() {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- assertNotNull(pipelineOptions.getGcpCredential());
- }
-
- @Test
- public void testUploadBufferSizeDefault() {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil util = pipelineOptions.getGcsUtil();
- assertNull(util.getUploadBufferSizeBytes());
- }
-
- @Test
- public void testUploadBufferSizeUserSpecified() {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- pipelineOptions.setGcsUploadBufferSizeBytes(12345);
- GcsUtil util = pipelineOptions.getGcsUtil();
- assertEquals((Integer) 12345, util.getUploadBufferSizeBytes());
- }
-
- @Test
- public void testCreationWithExecutorServiceProvided() {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- pipelineOptions.setExecutorService(Executors.newCachedThreadPool());
- assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService);
- }
-
- @Test
- public void testCreationWithGcsUtilProvided() {
- GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
- GcsUtil gcsUtil = Mockito.mock(GcsUtil.class);
- pipelineOptions.setGcsUtil(gcsUtil);
- assertSame(gcsUtil, pipelineOptions.getGcsUtil());
- }
-
- @Test
- public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception {
- GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
- ExecutorService executorService = pipelineOptions.getExecutorService();
-
- int numThreads = 100;
- final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads];
- for (int i = 0; i < numThreads; i++) {
- final int currentLatch = i;
- countDownLatches[i] = new CountDownLatch(1);
- executorService.execute(
- new Runnable() {
- @Override
- public void run() {
- // Wait for latch N and then release latch N - 1
- try {
- countDownLatches[currentLatch].await();
- if (currentLatch > 0) {
- countDownLatches[currentLatch - 1].countDown();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- // Release the last latch starting the chain reaction.
- countDownLatches[countDownLatches.length - 1].countDown();
- executorService.shutdown();
- assertTrue("Expected tasks to complete",
- executorService.awaitTermination(10, TimeUnit.SECONDS));
- }
-
- @Test
- public void testGlobExpansion() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
- Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
- Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
-
- Objects modelObjects = new Objects();
- List<StorageObject> items = new ArrayList<>();
- // A directory
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
-
- // Files within the directory
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name"));
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name"));
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name"));
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile"));
-
- modelObjects.setItems(items);
-
- when(mockStorage.objects()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")).thenReturn(
- mockStorageGet);
- when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
- when(mockStorageGet.execute()).thenReturn(
- new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
- when(mockStorageList.execute()).thenReturn(modelObjects);
-
- // Test a single file.
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
- List<GcsPath> expectedFiles =
- ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
-
- assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
- }
-
- // Test patterns.
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
- List<GcsPath> expectedFiles = ImmutableList.of(
- GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
- assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
- }
-
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
- List<GcsPath> expectedFiles = ImmutableList.of(
- GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
- assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
- }
-
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
- List<GcsPath> expectedFiles = ImmutableList.of(
- GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
- assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
- }
-
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
- List<GcsPath> expectedFiles = ImmutableList.of(
- GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
-
- assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
- }
- }
-
- // Patterns that contain recursive wildcards ('**') are not supported.
- @Test
- public void testRecursiveGlobExpansionFails() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**");
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Unsupported wildcard usage");
- gcsUtil.expand(pattern);
- }
-
- // GCSUtil.expand() should fail when matching a single object when that object does not exist.
- // We should return the empty result since GCS get object is strongly consistent.
- @Test
- public void testNonExistentObjectReturnsEmptyResult() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
- Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile");
- GoogleJsonResponseException expectedException =
- googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
- "It don't exist", "Nothing here to see");
-
- when(mockStorage.objects()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
- mockStorageGet);
- when(mockStorageGet.execute()).thenThrow(expectedException);
-
- assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern));
- }
-
- // GCSUtil.expand() should fail for other errors such as access denied.
- @Test
- public void testAccessDeniedObjectThrowsIOException() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
- Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile");
- GoogleJsonResponseException expectedException =
- googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
- "Waves hand mysteriously", "These aren't the buckets you're looking for");
-
- when(mockStorage.objects()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
- mockStorageGet);
- when(mockStorageGet.execute()).thenThrow(expectedException);
-
- thrown.expect(IOException.class);
- thrown.expectMessage("Unable to get the file object for path");
- gcsUtil.expand(pattern);
- }
-
- @Test
- public void testFileSizeNonBatch() throws Exception {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
- Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
- when(mockStorage.objects()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
- when(mockStorageGet.execute()).thenReturn(
- new StorageObject().setSize(BigInteger.valueOf(1000)));
-
- assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")));
- }
-
- @Test
- public void testFileSizeWhenFileNotFoundNonBatch() throws Exception {
- MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse();
- notFoundResponse.setContent("");
- notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
-
- MockHttpTransport mockTransport =
- new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
-
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
-
- thrown.expect(FileNotFoundException.class);
- gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
- }
-
- @Test
- public void testRetryFileSizeNonBatch() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
- Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
-
- when(mockStorage.objects()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
- when(mockStorageGet.execute())
- .thenThrow(new SocketTimeoutException("SocketException"))
- .thenThrow(new SocketTimeoutException("SocketException"))
- .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
-
- assertEquals(1000,
- gcsUtil.getObject(
- GcsPath.fromComponents("testbucket", "testobject"),
- mockBackOff,
- new FastNanoClockAndSleeper()).getSize().longValue());
- assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
- }
-
- @Test
- public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception {
- JsonFactory jsonFactory = new JacksonFactory();
-
- String contentBoundary = "batch_foobarbaz";
- String contentBoundaryLine = "--" + contentBoundary;
- String endOfContentBoundaryLine = "--" + contentBoundary + "--";
-
- GenericJson error = new GenericJson()
- .set("error", new GenericJson().set("code", 404));
- error.setFactory(jsonFactory);
-
- String content = contentBoundaryLine + "\n"
- + "Content-Type: application/http\n"
- + "\n"
- + "HTTP/1.1 404 Not Found\n"
- + "Content-Length: -1\n"
- + "\n"
- + error.toString()
- + "\n"
- + "\n"
- + endOfContentBoundaryLine
- + "\n";
- thrown.expect(FileNotFoundException.class);
- MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse()
- .setContentType("multipart/mixed; boundary=" + contentBoundary)
- .setContent(content)
- .setStatusCode(HttpStatusCodes.STATUS_CODE_OK);
-
- MockHttpTransport mockTransport =
- new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
-
- GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
- gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
- gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
- }
-
- @Test
- public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception {
- JsonFactory jsonFactory = new JacksonFactory();
-
- String contentBoundary = "batch_foobarbaz";
- String contentBoundaryLine = "--" + contentBoundary;
- String endOfContentBoundaryLine = "--" + contentBoundary + "--";
-
- GenericJson error = new GenericJson()
- .set("error", new GenericJson().set("code", 404));
- error.setFactory(jsonFactory);
-
- String content = contentBoundaryLine + "\n"
- + "Content-Type: application/http\n"
- + "\n"
- + "HTTP/1.1 404 Not Found\n"
- + "Content-Length: -1\n"
- + "\n"
- + error.toString()
- + "\n"
- + "\n"
- + endOfContentBoundaryLine
- + "\n";
- thrown.expect(FileNotFoundException.class);
-
- final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class);
- when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary);
-
- // 429: Too many requests, then 200: OK.
- when(mockResponse.getStatusCode()).thenReturn(429, 200);
- when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content));
-
- // A mock transport that lets us mock the API responses.
- MockHttpTransport mockTransport =
- new MockHttpTransport.Builder()
- .setLowLevelHttpRequest(
- new MockLowLevelHttpRequest() {
- @Override
- public LowLevelHttpResponse execute() throws IOException {
- return mockResponse;
- }
- })
- .build();
-
- GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
- gcsUtil.setStorageClient(
- new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
- gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
- }
-
- @Test
- public void testCreateBucket() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
- when(mockStorage.buckets()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.insert(
- any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
- when(mockStorageInsert.execute())
- .thenThrow(new SocketTimeoutException("SocketException"))
- .thenReturn(new Bucket());
-
- gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
- }
-
- @Test
- public void testCreateBucketAccessErrors() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
- Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
- GoogleJsonResponseException expectedException =
- googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
- "Waves hand mysteriously", "These aren't the buckets you're looking for");
-
- when(mockStorage.buckets()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.insert(
- any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
- when(mockStorageInsert.execute())
- .thenThrow(expectedException);
-
- thrown.expect(AccessDeniedException.class);
-
- gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
- }
-
- @Test
- public void testBucketAccessible() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
- Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
- when(mockStorage.buckets()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
- when(mockStorageGet.execute())
- .thenThrow(new SocketTimeoutException("SocketException"))
- .thenReturn(new Bucket());
-
- assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
- mockBackOff, new FastNanoClockAndSleeper()));
- }
-
- @Test
- public void testBucketDoesNotExistBecauseOfAccessError() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
- Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
- GoogleJsonResponseException expectedException =
- googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
- "Waves hand mysteriously", "These aren't the buckets you're looking for");
-
- when(mockStorage.buckets()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
- when(mockStorageGet.execute())
- .thenThrow(expectedException);
-
- assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
- mockBackOff, new FastNanoClockAndSleeper()));
- }
-
- @Test
- public void testBucketDoesNotExist() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
- Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
- when(mockStorage.buckets()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
- when(mockStorageGet.execute())
- .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
- "It don't exist", "Nothing here to see"));
-
- assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
- mockBackOff, new FastNanoClockAndSleeper()));
- }
-
- @Test
- public void testGetBucket() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
- Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
- when(mockStorage.buckets()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
- when(mockStorageGet.execute())
- .thenThrow(new SocketTimeoutException("SocketException"))
- .thenReturn(new Bucket());
-
- assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
- mockBackOff, new FastNanoClockAndSleeper()));
- }
-
- @Test
- public void testGetBucketNotExists() throws IOException {
- GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
- GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
-
- Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
-
- Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
- Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
-
- when(mockStorage.buckets()).thenReturn(mockStorageObjects);
- when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
- when(mockStorageGet.execute())
- .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
- "It don't exist", "Nothing here to see"));
-
- thrown.expect(FileNotFoundException.class);
- thrown.expectMessage("It don't exist");
- gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
- mockBackOff, new FastNanoClockAndSleeper());
- }
-
- @Test
- public void testGCSChannelCloseIdempotent() throws IOException {
- SeekableByteChannel channel =
- new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null,
- new ClientRequestHelper<StorageObject>());
- channel.close();
- channel.close();
- }
-
- /**
- * Builds a fake GoogleJsonResponseException for testing API error handling.
- */
- private static GoogleJsonResponseException googleJsonResponseException(
- final int status, final String reason, final String message) throws IOException {
- final JsonFactory jsonFactory = new JacksonFactory();
- HttpTransport transport = new MockHttpTransport() {
- @Override
- public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
- ErrorInfo errorInfo = new ErrorInfo();
- errorInfo.setReason(reason);
- errorInfo.setMessage(message);
- errorInfo.setFactory(jsonFactory);
- GenericJson error = new GenericJson();
- error.set("code", status);
- error.set("errors", Arrays.asList(errorInfo));
- error.setFactory(jsonFactory);
- GenericJson errorResponse = new GenericJson();
- errorResponse.set("error", error);
- errorResponse.setFactory(jsonFactory);
- return new MockLowLevelHttpRequest().setResponse(
- new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
- .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
- }
- };
- HttpRequest request =
- transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
- request.setThrowExceptionOnExecuteError(false);
- HttpResponse response = request.execute();
- return GoogleJsonResponseException.from(jsonFactory, response);
- }
-
- private static List<String> makeStrings(String s, int n) {
- ImmutableList.Builder<String> ret = ImmutableList.builder();
- for (int i = 0; i < n; ++i) {
- ret.add(String.format("gs://bucket/%s%d", s, i));
- }
- return ret.build();
- }
-
- private static List<GcsPath> makeGcsPaths(String s, int n) {
- ImmutableList.Builder<GcsPath> ret = ImmutableList.builder();
- for (int i = 0; i < n; ++i) {
- ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i)));
- }
- return ret.build();
- }
-
- private static int sumBatchSizes(List<BatchRequest> batches) {
- int ret = 0;
- for (BatchRequest b : batches) {
- ret += b.size();
- assertThat(b.size(), greaterThan(0));
- }
- return ret;
- }
-
- @Test
- public void testMakeCopyBatches() throws IOException {
- GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
- // Small number of files fits in 1 batch
- List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3));
- assertThat(batches.size(), equalTo(1));
- assertThat(sumBatchSizes(batches), equalTo(3));
-
- // 1 batch of files fits in 1 batch
- batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100));
- assertThat(batches.size(), equalTo(1));
- assertThat(sumBatchSizes(batches), equalTo(100));
-
- // A little more than 5 batches of files fits in 6 batches
- batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501));
- assertThat(batches.size(), equalTo(6));
- assertThat(sumBatchSizes(batches), equalTo(501));
- }
-
- @Test
- public void testInvalidCopyBatches() throws IOException {
- GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Number of source files 3");
-
- gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1));
- }
-
- @Test
- public void testMakeRemoveBatches() throws IOException {
- GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
- // Small number of files fits in 1 batch
- List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3));
- assertThat(batches.size(), equalTo(1));
- assertThat(sumBatchSizes(batches), equalTo(3));
-
- // 1 batch of files fits in 1 batch
- batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
- assertThat(batches.size(), equalTo(1));
- assertThat(sumBatchSizes(batches), equalTo(100));
-
- // A little more than 5 batches of files fits in 6 batches
- batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
- assertThat(batches.size(), equalTo(6));
- assertThat(sumBatchSizes(batches), equalTo(501));
- }
-
- @Test
- public void testMakeGetBatches() throws IOException {
- GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
-
- // Small number of files fits in 1 batch
- List<StorageObjectOrIOException[]> results = Lists.newArrayList();
- List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results);
- assertThat(batches.size(), equalTo(1));
- assertThat(sumBatchSizes(batches), equalTo(3));
- assertEquals(3, results.size());
-
- // 1 batch of files fits in 1 batch
- results = Lists.newArrayList();
- batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results);
- assertThat(batches.size(), equalTo(1));
- assertThat(sumBatchSizes(batches), equalTo(100));
- assertEquals(100, results.size());
-
- // A little more than 5 batches of files fits in 6 batches
- results = Lists.newArrayList();
- batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results);
- assertThat(batches.size(), equalTo(6));
- assertThat(sumBatchSizes(batches), equalTo(501));
- assertEquals(501, results.size());
- }
-
- /**
- * A helper to wrap a {@link GenericJson} object in a content stream.
- */
- private static InputStream toStream(String content) throws IOException {
- return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
deleted file mode 100644
index 8e7878c..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link IntervalBoundedExponentialBackOff}. */
-@RunWith(JUnit4.class)
-public class IntervalBoundedExponentialBackOffTest {
- @Rule public ExpectedException exception = ExpectedException.none();
-
-
- @Test
- public void testUsingInvalidInitialInterval() throws Exception {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Initial interval must be greater than zero.");
- new IntervalBoundedExponentialBackOff(1000L, 0L);
- }
-
- @Test
- public void testUsingInvalidMaximumInterval() throws Exception {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Maximum interval must be greater than zero.");
- new IntervalBoundedExponentialBackOff(-1L, 10L);
- }
-
- @Test
- public void testThatcertainNumberOfAttemptsReachesMaxInterval() throws Exception {
- IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- }
-
- @Test
- public void testThatResettingAllowsReuse() throws Exception {
- IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- }
-
- @Test
- public void testAtMaxInterval() throws Exception {
- IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500);
- assertFalse(backOff.atMaxInterval());
- backOff.nextBackOffMillis();
- assertFalse(backOff.atMaxInterval());
- backOff.nextBackOffMillis();
- assertTrue(backOff.atMaxInterval());
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
- lessThanOrEqualTo(1500L)));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
deleted file mode 100644
index 71554b5..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpResponseException;
-import com.google.api.client.http.HttpResponseInterceptor;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.LowLevelHttpRequest;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.Storage.Objects.Get;
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.security.PrivateKey;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicLong;
-import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Tests for RetryHttpRequestInitializer.
- */
-@RunWith(JUnit4.class)
-public class RetryHttpRequestInitializerTest {
-
- @Mock private Credential mockCredential;
- @Mock private PrivateKey mockPrivateKey;
- @Mock private LowLevelHttpRequest mockLowLevelRequest;
- @Mock private LowLevelHttpResponse mockLowLevelResponse;
- @Mock private HttpResponseInterceptor mockHttpResponseInterceptor;
-
- private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
- private Storage storage;
-
- // Used to test retrying a request more than the default 10 times.
- static class MockNanoClock implements NanoClock {
- private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543,
- 12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420};
- private int i = 0;
-
- @Override
- public long nanoTime() {
- return timesMs[i++ / 2] * 1000000;
- }
- }
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- HttpTransport lowLevelTransport = new HttpTransport() {
- @Override
- protected LowLevelHttpRequest buildRequest(String method, String url)
- throws IOException {
- return mockLowLevelRequest;
- }
- };
-
- // Retry initializer will pass through to credential, since we can have
- // only a single HttpRequestInitializer, and we use multiple Credential
- // types in the SDK, not all of which allow for retry configuration.
- RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer(
- mockCredential, new MockNanoClock(), new Sleeper() {
- @Override
- public void sleep(long millis) throws InterruptedException {}
- }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor);
- storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer)
- .setApplicationName("test").build();
- }
-
- @After
- public void tearDown() {
- verifyNoMoreInteractions(mockPrivateKey);
- verifyNoMoreInteractions(mockLowLevelRequest);
- verifyNoMoreInteractions(mockCredential);
- verifyNoMoreInteractions(mockHttpResponseInterceptor);
- }
-
- @Test
- public void testBasicOperation() throws IOException {
- when(mockLowLevelRequest.execute())
- .thenReturn(mockLowLevelResponse);
- when(mockLowLevelResponse.getStatusCode())
- .thenReturn(200);
-
- Storage.Buckets.Get result = storage.buckets().get("test");
- HttpResponse response = result.executeUnparsed();
- assertNotNull(response);
-
- verify(mockCredential).initialize(any(HttpRequest.class));
- verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
- verify(mockLowLevelRequest, atLeastOnce())
- .addHeader(anyString(), anyString());
- verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
- verify(mockLowLevelRequest).execute();
- verify(mockLowLevelResponse).getStatusCode();
- }
-
- /**
- * Tests that a non-retriable error is not retried.
- */
- @Test
- public void testErrorCodeForbidden() throws IOException {
- when(mockLowLevelRequest.execute())
- .thenReturn(mockLowLevelResponse);
- when(mockLowLevelResponse.getStatusCode())
- .thenReturn(403) // Non-retryable error.
- .thenReturn(200); // Shouldn't happen.
-
- try {
- Storage.Buckets.Get result = storage.buckets().get("test");
- HttpResponse response = result.executeUnparsed();
- assertNotNull(response);
- } catch (HttpResponseException e) {
- Assert.assertThat(e.getMessage(), Matchers.containsString("403"));
- }
-
- verify(mockCredential).initialize(any(HttpRequest.class));
- verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
- verify(mockLowLevelRequest, atLeastOnce())
- .addHeader(anyString(), anyString());
- verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
- verify(mockLowLevelRequest).execute();
- verify(mockLowLevelResponse).getStatusCode();
- }
-
- /**
- * Tests that a retriable error is retried.
- */
- @Test
- public void testRetryableError() throws IOException {
- when(mockLowLevelRequest.execute())
- .thenReturn(mockLowLevelResponse)
- .thenReturn(mockLowLevelResponse)
- .thenReturn(mockLowLevelResponse);
- when(mockLowLevelResponse.getStatusCode())
- .thenReturn(503) // Retryable
- .thenReturn(429) // We also retry on 429 Too Many Requests.
- .thenReturn(200);
-
- Storage.Buckets.Get result = storage.buckets().get("test");
- HttpResponse response = result.executeUnparsed();
- assertNotNull(response);
-
- verify(mockCredential).initialize(any(HttpRequest.class));
- verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
- verify(mockLowLevelRequest, atLeastOnce())
- .addHeader(anyString(), anyString());
- verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt());
- verify(mockLowLevelRequest, times(3)).execute();
- verify(mockLowLevelResponse, times(3)).getStatusCode();
- }
-
- /**
- * Tests that an IOException is retried.
- */
- @Test
- public void testThrowIOException() throws IOException {
- when(mockLowLevelRequest.execute())
- .thenThrow(new IOException("Fake Error"))
- .thenReturn(mockLowLevelResponse);
- when(mockLowLevelResponse.getStatusCode())
- .thenReturn(200);
-
- Storage.Buckets.Get result = storage.buckets().get("test");
- HttpResponse response = result.executeUnparsed();
- assertNotNull(response);
-
- verify(mockCredential).initialize(any(HttpRequest.class));
- verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
- verify(mockLowLevelRequest, atLeastOnce())
- .addHeader(anyString(), anyString());
- verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt());
- verify(mockLowLevelRequest, times(2)).execute();
- verify(mockLowLevelResponse).getStatusCode();
- }
-
- /**
- * Tests that a retryable error is retried enough times.
- */
- @Test
- public void testRetryableErrorRetryEnoughTimes() throws IOException {
- when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse);
- final int retries = 10;
- when(mockLowLevelResponse.getStatusCode()).thenAnswer(new Answer<Integer>(){
- int n = 0;
- @Override
- public Integer answer(InvocationOnMock invocation) {
- return (n++ < retries - 1) ? 503 : 200;
- }});
-
- Storage.Buckets.Get result = storage.buckets().get("test");
- HttpResponse response = result.executeUnparsed();
- assertNotNull(response);
-
- verify(mockCredential).initialize(any(HttpRequest.class));
- verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
- verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(),
- anyString());
- verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt());
- verify(mockLowLevelRequest, times(retries)).execute();
- verify(mockLowLevelResponse, times(retries)).getStatusCode();
- }
-
- /**
- * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO exception handler
- * is invoked.
- */
- @Test
- public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception {
- // Counts the number of calls to execute the HTTP request.
- final AtomicLong executeCount = new AtomicLong();
-
- // 10 is a private internal constant in the Google API Client library. See
- // com.google.api.client.http.HttpRequest#setNumberOfRetries
- // TODO: update this test once the private internal constant is public.
- final int defaultNumberOfRetries = 10;
-
- // A mock HTTP request that always throws SocketTimeoutException.
- MockHttpTransport transport =
- new MockHttpTransport.Builder().setLowLevelHttpRequest(new MockLowLevelHttpRequest() {
- @Override
- public LowLevelHttpResponse execute() throws IOException {
- executeCount.incrementAndGet();
- throw new SocketTimeoutException("Fake forced timeout exception");
- }
- }).build();
-
- // A sample HTTP request to Google Cloud Storage that uses both default Transport and default
- // RetryHttpInitializer.
- Storage storage = new Storage.Builder(
- transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build();
-
- Get getRequest = storage.objects().get("gs://fake", "file");
-
- try {
- getRequest.execute();
- fail();
- } catch (Throwable e) {
- assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
- assertEquals(1 + defaultNumberOfRetries, executeCount.get());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/pom.xml b/sdks/java/extensions/gcp-core/pom.xml
new file mode 100644
index 0000000..d566f94
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core</name>
+ <description>Common components used to support multiple
+ Google Cloud Platform specific maven modules.</description>
+
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludedGroups>
+ org.apache.beam.sdk.testing.NeedsRunner
+ </excludedGroups>
+ <systemPropertyVariables>
+ <beamUseDummyRunner>true</beamUseDummyRunner>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+
+ <!-- Coverage analysis for unit tests. -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client-jackson2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.oauth-client</groupId>
+ <artifactId>google-oauth-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-oauth2-http</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.api-client</groupId>
+ <artifactId>google-api-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>gcsio</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>util</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-cloudresourcemanager</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-pubsub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-bigquery</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-storage</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-credentials</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <!-- build dependencies -->
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
new file mode 100644
index 0000000..7672cd7
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+/**
+ * Properties needed when using Google BigQuery with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google BigQuery. See "
+ + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
+public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
+ PipelineOptions, StreamingOptions {
+ @Description("Temporary dataset for BigQuery table operations. "
+ + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
+ @Default.String("bigquery.googleapis.com/cloud_dataflow")
+ String getTempDatasetId();
+ void setTempDatasetId(String value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
new file mode 100644
index 0000000..13fdaf3
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.util.GcpProjectUtil;
+
+/**
+ * Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
+ */
+@Description("Options that are used to configure Google CloudResourceManager. See "
+ + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
+public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
+ PipelineOptions, StreamingOptions {
+ /**
+ * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
+ */
+ @JsonIgnore
+ @Description("The GcpProjectUtil instance that should be used to communicate"
+ + " with Google Cloud Resource Manager.")
+ @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
+ @Hidden
+ GcpProjectUtil getGcpProjectUtil();
+ void setGcpProjectUtil(GcpProjectUtil value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
new file mode 100644
index 0000000..d01406f
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.auth.Credentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.CredentialFactory;
+import org.apache.beam.sdk.util.DefaultBucket;
+import org.apache.beam.sdk.util.GcpCredentialFactory;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Options used to configure Google Cloud Platform specific options such as the project
+ * and credentials.
+ *
+ * <p>These options defer to the
+ * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
+ * application default credentials</a> for authentication. See the
+ * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for
+ * alternative mechanisms for creating credentials.
+ */
+@Description("Options used to configure Google Cloud Platform project and credentials.")
+public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
+ /**
+ * Project id to use when launching jobs.
+ */
+ @Description("Project id. Required when using Google Cloud Platform services. "
+ + "See https://cloud.google.com/storage/docs/projects for further details.")
+ @Default.InstanceFactory(DefaultProjectFactory.class)
+ String getProject();
+ void setProject(String value);
+
+ /**
+ * GCP <a href="https://developers.google.com/compute/docs/zones"
+ * >availability zone</a> for operations.
+ *
+ * <p>Default is set on a per-service basis.
+ */
+ @Description("GCP availability zone for running GCP operations. "
+ + "Default is up to the individual service.")
+ String getZone();
+ void setZone(String value);
+
+ /**
+ * The class of the credential factory that should be created and used to create
+ * credentials. If gcpCredential has not been set explicitly, an instance of this class will
+ * be constructed and used as a credential factory.
+ */
+ @Description("The class of the credential factory that should be created and used to create "
+ + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
+ + "be constructed and used as a credential factory.")
+ @Default.Class(GcpCredentialFactory.class)
+ Class<? extends CredentialFactory> getCredentialFactoryClass();
+ void setCredentialFactoryClass(
+ Class<? extends CredentialFactory> credentialFactoryClass);
+
+ /**
+ * The credential instance that should be used to authenticate against GCP services.
+ * If no credential has been set explicitly, the default is to use the instance factory
+ * that constructs a credential based upon the currently set credentialFactoryClass.
+ */
+ @JsonIgnore
+ @Description("The credential instance that should be used to authenticate against GCP services. "
+ + "If no credential has been set explicitly, the default is to use the instance factory "
+ + "that constructs a credential based upon the currently set credentialFactoryClass.")
+ @Default.InstanceFactory(GcpUserCredentialsFactory.class)
+ Credentials getGcpCredential();
+ void setGcpCredential(Credentials value);
+
+ /**
+ * Attempts to infer the default project based upon the environment this application
+ * is executing within. Currently this only supports getting the default project from gcloud.
+ */
+ class DefaultProjectFactory implements DefaultValueFactory<String> {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
+
+ @Override
+ public String create(PipelineOptions options) {
+ try {
+ File configFile;
+ if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
+ configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
+ } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
+ configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
+ } else {
+ // New versions of gcloud use this file
+ configFile = new File(
+ System.getProperty("user.home"),
+ ".config/gcloud/configurations/config_default");
+ if (!configFile.exists()) {
+ // Old versions of gcloud use this file
+ configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
+ }
+ }
+ String section = null;
+ Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
+ Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
+ for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
+ line = line.trim();
+ if (line.isEmpty() || line.startsWith(";")) {
+ continue;
+ }
+ Matcher matcher = sectionPattern.matcher(line);
+ if (matcher.matches()) {
+ section = matcher.group(1);
+ } else if (section == null || section.equals("core")) {
+ matcher = projectPattern.matcher(line);
+ if (matcher.matches()) {
+ String project = matcher.group(1).trim();
+ LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
+ + "project, please cancel this Pipeline and specify the command-line "
+ + "argument --project.", project);
+ return project;
+ }
+ }
+ }
+ } catch (IOException expected) {
+ LOG.debug("Failed to find default project.", expected);
+ }
+ // return null if can't determine
+ return null;
+ }
+
+ /**
+ * Returns true if running on the Windows OS.
+ */
+ private static boolean isWindows() {
+ return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
+ }
+
+ /**
+ * Used to mock out getting environment variables.
+ */
+ @VisibleForTesting
+ Map<String, String> getEnvironment() {
+ return System.getenv();
+ }
+ }
+
+ /**
+ * Attempts to load the GCP credentials. See
+ * {@link CredentialFactory#getCredential()} for more details.
+ */
+ class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> {
+ @Override
+ public Credentials create(PipelineOptions options) {
+ GcpOptions gcpOptions = options.as(GcpOptions.class);
+ try {
+ CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
+ .fromClass(gcpOptions.getCredentialFactoryClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ return factory.getCredential();
+ } catch (IOException | GeneralSecurityException e) {
+ throw new RuntimeException("Unable to obtain credential", e);
+ }
+ }
+ }
+
+ /**
+ * A GCS path for storing temporary files in GCP.
+ *
+ * <p>Its default to {@link PipelineOptions#getTempLocation}.
+ */
+ @Description("A GCS path for storing temporary files in GCP.")
+ @Default.InstanceFactory(GcpTempLocationFactory.class)
+ @Nullable String getGcpTempLocation();
+ void setGcpTempLocation(String value);
+
+ /**
+ * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
+ */
+ class GcpTempLocationFactory implements DefaultValueFactory<String> {
+
+ @Override
+ @Nullable
+ public String create(PipelineOptions options) {
+ String tempLocation = options.getTempLocation();
+ if (isNullOrEmpty(tempLocation)) {
+ tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
+ options.setTempLocation(tempLocation);
+ } else {
+ try {
+ PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+ validator.validateOutputFilePrefixSupported(tempLocation);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path, %s. ", tempLocation), e);
+ }
+ }
+ return tempLocation;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..00be440
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.options;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A registrar containing the default GCP options.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>builder()
+ .add(BigQueryOptions.class)
+ .add(GcpOptions.class)
+ .add(GcsOptions.class)
+ .add(GoogleApiDebugOptions.class)
+ .add(PubsubOptions.class)
+ .build();
+ }
+}