You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/07/07 13:22:21 UTC

[beam] branch master updated: Propogate error messages from GcsUtil (#22079)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 03ad079d963 Propogate error messages from GcsUtil (#22079)
03ad079d963 is described below

commit 03ad079d963195edff0fb79bda944dd613b7c692
Author: Pranav Bhandari <bh...@gmail.com>
AuthorDate: Thu Jul 7 09:22:15 2022 -0400

    Propogate error messages from GcsUtil (#22079)
---
 .../dataflow/BatchStatefulParDoOverridesTest.java  |  3 +-
 .../dataflow/DataflowPipelineTranslatorTest.java   |  3 +-
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 29 +++++---
 .../extensions/gcp/storage/GcsPathValidator.java   |  6 +-
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      | 20 +++++-
 .../sdk/extensions/gcp/options/GcpOptionsTest.java |  5 +-
 .../gcp/storage/GcsPathValidatorTest.java          | 14 ++--
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  | 79 ++++++++++++++++++++++
 8 files changed, 137 insertions(+), 22 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index a77c540a48c..de326f71fb7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -189,7 +190,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
     GcsUtil mockGcsUtil = mock(GcsUtil.class);
     when(mockGcsUtil.expand(any(GcsPath.class)))
         .then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+    doNothing().when(mockGcsUtil).verifyBucketAccessible(any(GcsPath.class));
 
     DataflowPipelineOptions options =
         PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 5a92c889c9d..c6e4d3016be 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -209,7 +210,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     GcsUtil mockGcsUtil = mock(GcsUtil.class);
     when(mockGcsUtil.expand(any(GcsPath.class)))
         .then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+    doNothing().when(mockGcsUtil).verifyBucketAccessible(any(GcsPath.class));
 
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9bd6a5d9719..3a8e62c4d66 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.matchesRegex;
@@ -44,6 +45,8 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -294,12 +297,15 @@ public class DataflowRunnerTest implements Serializable {
 
     when(mockGcsUtil.expand(any(GcsPath.class)))
         .then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
-    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
-    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true);
-    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging/")))
-        .thenReturn(true);
-    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
-    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
+    doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET));
+    doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET));
+    doNothing()
+        .when(mockGcsUtil)
+        .verifyBucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"));
+    doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET));
+    doThrow(new FileNotFoundException())
+        .when(mockGcsUtil)
+        .verifyBucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET));
 
     // Let every valid path be matched
     when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
@@ -321,7 +327,7 @@ public class DataflowRunnerTest implements Serializable {
             });
 
     // The dataflow pipeline attempts to output to this location.
-    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true);
+    doNothing().when(mockGcsUtil).verifyBucketAccessible(GcsPath.fromUri("gs://bucket/object"));
 
     return mockGcsUtil;
   }
@@ -1063,7 +1069,8 @@ public class DataflowRunnerTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setGcpTempLocation(NON_EXISTENT_BUCKET);
 
-    thrown.expect(IllegalArgumentException.class);
+    thrown.expect(RuntimeException.class);
+    thrown.expectCause(instanceOf(FileNotFoundException.class));
     thrown.expectMessage(
         containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
     DataflowRunner.fromOptions(options);
@@ -1078,7 +1085,8 @@ public class DataflowRunnerTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setStagingLocation(NON_EXISTENT_BUCKET);
 
-    thrown.expect(IllegalArgumentException.class);
+    thrown.expect(RuntimeException.class);
+    thrown.expectCause(instanceOf(FileNotFoundException.class));
     thrown.expectMessage(
         containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
     DataflowRunner.fromOptions(options);
@@ -1093,7 +1101,8 @@ public class DataflowRunnerTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setSaveProfilesToGcs(NON_EXISTENT_BUCKET);
 
-    thrown.expect(IllegalArgumentException.class);
+    thrown.expect(RuntimeException.class);
+    thrown.expectCause(instanceOf(FileNotFoundException.class));
     thrown.expectMessage(
         containsString("Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET));
     DataflowRunner.fromOptions(options);
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
index 0fce1f2ba26..df9cc8e0a65 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
@@ -80,11 +80,9 @@ public class GcsPathValidator implements PathValidator {
   private void verifyPathIsAccessible(String path, String errorMessage) {
     GcsPath gcsPath = getGcsPath(path);
     try {
-      checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), errorMessage, path);
+      gcpOptions.getGcsUtil().verifyBucketAccessible(gcsPath);
     } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
+      throw new RuntimeException(String.format(errorMessage, path), e);
     }
   }
 
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index a4938e2a89d..30ce9dbd97e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -605,6 +605,14 @@ public class GcsUtil {
     return new GoogleCloudStorageImpl(options, storage, credentials);
   }
 
+  /**
+   * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws
+   * exception if the bucket is inaccessible due to permissions or does not exist.
+   */
+  public void verifyBucketAccessible(GcsPath path) throws IOException {
+    verifyBucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
+  }
+
   /** Returns whether the GCS bucket exists and is accessible. */
   public boolean bucketAccessible(GcsPath path) throws IOException {
     return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
@@ -640,6 +648,16 @@ public class GcsUtil {
     }
   }
 
+  /**
+   * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath, BackOff,
+   * Sleeper)}, but throws exception if the bucket is inaccessible due to permissions or does not
+   * exist.
+   */
+  @VisibleForTesting
+  void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+    getBucket(path, backoff, sleeper);
+  }
+
   @VisibleForTesting
   @Nullable
   Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
@@ -895,7 +913,7 @@ public class GcsUtil {
           readyToEnqueue = false;
           lastError = null;
         } else {
-          throw new FileNotFoundException(from.toString());
+          throw new FileNotFoundException(e.getMessage());
         }
       } else {
         lastError = e;
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
index 0f0b5b312aa..4b57dedc59b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
@@ -33,6 +33,7 @@ import com.google.api.services.cloudresourcemanager.CloudResourceManager.Project
 import com.google.api.services.cloudresourcemanager.model.Project;
 import com.google.api.services.storage.model.Bucket;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -209,7 +210,9 @@ public class GcpOptionsTest {
     public void testDefaultGcpTempLocationDoesNotExist() throws IOException {
       String tempLocation = "gs://does/not/exist";
       options.setTempLocation(tempLocation);
-      when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+      doThrow(new FileNotFoundException())
+          .when(mockGcsUtil)
+          .verifyBucketAccessible(any(GcsPath.class));
       thrown.expect(IllegalArgumentException.class);
       thrown.expectMessage(
           "Error constructing default value for gcpTempLocation: tempLocation is not"
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
index 924afe747c3..3dddd13c471 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.extensions.gcp.storage;
 
+import static org.hamcrest.Matchers.instanceOf;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
 
+import java.io.FileNotFoundException;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
@@ -45,7 +48,7 @@ public class GcsPathValidatorTest {
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+    doNothing().when(mockGcsUtil).verifyBucketAccessible(any(GcsPath.class));
     GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
     options.setGcpCredential(new TestCredential());
     options.setGcsUtil(mockGcsUtil);
@@ -75,8 +78,11 @@ public class GcsPathValidatorTest {
 
   @Test
   public void testWhenBucketDoesNotExist() throws Exception {
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
-    expectedException.expect(IllegalArgumentException.class);
+    doThrow(new FileNotFoundException())
+        .when(mockGcsUtil)
+        .verifyBucketAccessible(any(GcsPath.class));
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectCause(instanceOf(FileNotFoundException.class));
     expectedException.expectMessage("Could not find file gs://non-existent-bucket/location");
     validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
   }
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index f965b4d7900..7d190536800 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -744,6 +744,85 @@ public class GcsUtilTest {
             new FastNanoClockAndSleeper()));
   }
 
+  @Test
+  public void testVerifyBucketAccessible() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    gcsUtil.verifyBucketAccessible(
+        GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff,
+        new FastNanoClockAndSleeper());
+  }
+
+  @Test(expected = AccessDeniedException.class)
+  public void testVerifyBucketAccessibleAccessError() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(
+            HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously",
+            "These aren't the buckets you're looking for");
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(expectedException);
+
+    gcsUtil.verifyBucketAccessible(
+        GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff,
+        new FastNanoClockAndSleeper());
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testVerifyBucketAccessibleDoesNotExist() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(
+            googleJsonResponseException(
+                HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see"));
+
+    gcsUtil.verifyBucketAccessible(
+        GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff,
+        new FastNanoClockAndSleeper());
+  }
+
   @Test
   public void testGetBucket() throws IOException {
     GcsOptions pipelineOptions = gcsOptionsWithTestCredential();