You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/07/07 13:22:21 UTC
[beam] branch master updated: Propogate error messages from GcsUtil (#22079)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 03ad079d963 Propogate error messages from GcsUtil (#22079)
03ad079d963 is described below
commit 03ad079d963195edff0fb79bda944dd613b7c692
Author: Pranav Bhandari <bh...@gmail.com>
AuthorDate: Thu Jul 7 09:22:15 2022 -0400
Propogate error messages from GcsUtil (#22079)
---
.../dataflow/BatchStatefulParDoOverridesTest.java | 3 +-
.../dataflow/DataflowPipelineTranslatorTest.java | 3 +-
.../beam/runners/dataflow/DataflowRunnerTest.java | 29 +++++---
.../extensions/gcp/storage/GcsPathValidator.java | 6 +-
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 20 +++++-
.../sdk/extensions/gcp/options/GcpOptionsTest.java | 5 +-
.../gcp/storage/GcsPathValidatorTest.java | 14 ++--
.../beam/sdk/extensions/gcp/util/GcsUtilTest.java | 79 ++++++++++++++++++++++
8 files changed, 137 insertions(+), 22 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index a77c540a48c..de326f71fb7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -189,7 +190,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.expand(any(GcsPath.class)))
.then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+ doNothing().when(mockGcsUtil).verifyBucketAccessible(any(GcsPath.class));
DataflowPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 5a92c889c9d..c6e4d3016be 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -209,7 +210,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.expand(any(GcsPath.class)))
.then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+ doNothing().when(mockGcsUtil).verifyBucketAccessible(any(GcsPath.class));
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9bd6a5d9719..3a8e62c4d66 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.matchesRegex;
@@ -44,6 +45,8 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -294,12 +297,15 @@ public class DataflowRunnerTest implements Serializable {
when(mockGcsUtil.expand(any(GcsPath.class)))
.then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
- when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
- when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true);
- when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging/")))
- .thenReturn(true);
- when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
- when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
+ doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET));
+ doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET));
+ doNothing()
+ .when(mockGcsUtil)
+ .verifyBucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"));
+ doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET));
+ doThrow(new FileNotFoundException())
+ .when(mockGcsUtil)
+ .verifyBucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET));
// Let every valid path be matched
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
@@ -321,7 +327,7 @@ public class DataflowRunnerTest implements Serializable {
});
// The dataflow pipeline attempts to output to this location.
- when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true);
+ doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri("gs://bucket/object"));
return mockGcsUtil;
}
@@ -1063,7 +1069,8 @@ public class DataflowRunnerTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setGcpTempLocation(NON_EXISTENT_BUCKET);
- thrown.expect(IllegalArgumentException.class);
+ thrown.expect(RuntimeException.class);
+ thrown.expectCause(instanceOf(FileNotFoundException.class));
thrown.expectMessage(
containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
@@ -1078,7 +1085,8 @@ public class DataflowRunnerTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setStagingLocation(NON_EXISTENT_BUCKET);
- thrown.expect(IllegalArgumentException.class);
+ thrown.expect(RuntimeException.class);
+ thrown.expectCause(instanceOf(FileNotFoundException.class));
thrown.expectMessage(
containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
@@ -1093,7 +1101,8 @@ public class DataflowRunnerTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setSaveProfilesToGcs(NON_EXISTENT_BUCKET);
- thrown.expect(IllegalArgumentException.class);
+ thrown.expect(RuntimeException.class);
+ thrown.expectCause(instanceOf(FileNotFoundException.class));
thrown.expectMessage(
containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
DataflowRunner.fromOptions(options);
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
index 0fce1f2ba26..df9cc8e0a65 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
@@ -80,11 +80,9 @@ public class GcsPathValidator implements PathValidator {
private void verifyPathIsAccessible(String path, String errorMessage) {
GcsPath gcsPath = getGcsPath(path);
try {
- checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), errorMessage, path);
+ gcpOptions.getGcsUtil().verifyBucketAccessible(gcsPath);
} catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
+ throw new RuntimeException(String.format(errorMessage, path), e);
}
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index a4938e2a89d..30ce9dbd97e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -605,6 +605,14 @@ public class GcsUtil {
return new GoogleCloudStorageImpl(options, storage, credentials);
}
+ /**
+ * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws
+ * exception if the bucket is inaccessible due to permissions or does not exist.
+ */
+ public void verifyBucketAccessible(GcsPath path) throws IOException {
+ verifyBucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
+ }
+
/** Returns whether the GCS bucket exists and is accessible. */
public boolean bucketAccessible(GcsPath path) throws IOException {
return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
@@ -640,6 +648,16 @@ public class GcsUtil {
}
}
+ /**
+ * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath, BackOff,
+ * Sleeper)}, but throws exception if the bucket is inaccessible due to permissions or does not
+ * exist.
+ */
+ @VisibleForTesting
+ void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+ getBucket(path, backoff, sleeper);
+ }
+
@VisibleForTesting
@Nullable
Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
@@ -895,7 +913,7 @@ public class GcsUtil {
readyToEnqueue = false;
lastError = null;
} else {
- throw new FileNotFoundException(from.toString());
+ throw new FileNotFoundException(e.getMessage());
}
} else {
lastError = e;
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
index 0f0b5b312aa..4b57dedc59b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
@@ -33,6 +33,7 @@ import com.google.api.services.cloudresourcemanager.CloudResourceManager.Project
import com.google.api.services.cloudresourcemanager.model.Project;
import com.google.api.services.storage.model.Bucket;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -209,7 +210,9 @@ public class GcpOptionsTest {
public void testDefaultGcpTempLocationDoesNotExist() throws IOException {
String tempLocation = "gs://does/not/exist";
options.setTempLocation(tempLocation);
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+ doThrow(new FileNotFoundException())
+ .when(mockGcsUtil)
+ .verifyBucketAccessible(any(GcsPath.class));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Error constructing default value for gcpTempLocation: tempLocation is not"
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
index 924afe747c3..3dddd13c471 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.extensions.gcp.storage;
+import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import java.io.FileNotFoundException;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
@@ -45,7 +48,7 @@ public class GcsPathValidatorTest {
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+ doNothing().when(mockGcsUtil).verifyBucketAccessible(any(GcsPath.class));
GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
options.setGcpCredential(new TestCredential());
options.setGcsUtil(mockGcsUtil);
@@ -75,8 +78,11 @@ public class GcsPathValidatorTest {
@Test
public void testWhenBucketDoesNotExist() throws Exception {
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
- expectedException.expect(IllegalArgumentException.class);
+ doThrow(new FileNotFoundException())
+ .when(mockGcsUtil)
+ .verifyBucketAccessible(any(GcsPath.class));
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectCause(instanceOf(FileNotFoundException.class));
expectedException.expectMessage("Could not find file gs://non-existent-bucket/location");
validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index f965b4d7900..7d190536800 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -744,6 +744,85 @@ public class GcsUtilTest {
new FastNanoClockAndSleeper()));
}
+ @Test
+ public void testVerifyBucketAccessible() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+ Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
+
+ when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+ when(mockStorageGet.execute())
+ .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenReturn(new Bucket());
+
+ gcsUtil.verifyBucketAccessible(
+ GcsPath.fromComponents("testbucket", "testobject"),
+ mockBackOff,
+ new FastNanoClockAndSleeper());
+ }
+
+ @Test(expected = AccessDeniedException.class)
+ public void testVerifyBucketAccessibleAccessError() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+ Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
+ GoogleJsonResponseException expectedException =
+ googleJsonResponseException(
+ HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+ "Waves hand mysteriously",
+ "These aren't the buckets you're looking for");
+
+ when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+ when(mockStorageGet.execute()).thenThrow(expectedException);
+
+ gcsUtil.verifyBucketAccessible(
+ GcsPath.fromComponents("testbucket", "testobject"),
+ mockBackOff,
+ new FastNanoClockAndSleeper());
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testVerifyBucketAccessibleDoesNotExist() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+ Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
+
+ when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+ when(mockStorageGet.execute())
+ .thenThrow(
+ googleJsonResponseException(
+ HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see"));
+
+ gcsUtil.verifyBucketAccessible(
+ GcsPath.fromComponents("testbucket", "testobject"),
+ mockBackOff,
+ new FastNanoClockAndSleeper());
+ }
+
@Test
public void testGetBucket() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();