You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/08/05 18:49:34 UTC

[beam] branch master updated: Add support when writing to locked buckets by handling retentionPolicyNotMet error (#22138)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e523b8e9d Add support when writing to locked buckets by handling retentionPolicyNotMet error (#22138)
d0e523b8e9d is described below

commit d0e523b8e9da3a31bc3eb4dc706895638ec86b96
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Fri Aug 5 14:49:28 2022 -0400

    Add support when writing to locked buckets by handling retentionPolicyNotMet error (#22138)
    
    * handling retentionPolicyNotMet error
    
    * draft test
    
    * 1) deleting source when appropriate. 2) test retention error files are identical
    
    * added test to verify valid retentionPolicyNotMet errors are still being thrown
    
    * checking for nullness
---
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      | 28 ++++++++
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  | 78 ++++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 30ce9dbd97e..c5d92a1a35c 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -57,6 +57,7 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.FileAlreadyExistsException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -915,6 +916,33 @@ public class GcsUtil {
         } else {
           throw new FileNotFoundException(e.getMessage());
         }
+      } else if (e.getCode() == 403
+          && e.getErrors().size() == 1
+          && e.getErrors().get(0).getReason().equals("retentionPolicyNotMet")) {
+        List<StorageObjectOrIOException> srcAndDestObjects = getObjects(Arrays.asList(from, to));
+        String srcHash = srcAndDestObjects.get(0).storageObject().getMd5Hash();
+        String destHash = srcAndDestObjects.get(1).storageObject().getMd5Hash();
+        if (srcHash != null && srcHash.equals(destHash)) {
+          // Source and destination are identical. Treat this as a successful rewrite
+          LOG.warn(
+              "Caught retentionPolicyNotMet error while rewriting to a bucket with retention "
+                  + "policy. Skipping because destination {} and source {} are considered identical "
+                  + "because their MD5 Hashes are equal.",
+              getFrom(),
+              getTo());
+
+          if (deleteSource) {
+            readyToEnqueue = true;
+            performDelete = true;
+          } else {
+            readyToEnqueue = false;
+          }
+          lastError = null;
+        } else {
+          // User is attempting to write to a file that hasn't met its retention policy yet.
+          // Not a transient error so likely will not be fixed by a retry
+          throw new IOException(e.getMessage());
+        }
       } else {
         lastError = e;
         readyToEnqueue = true;
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 7d190536800..8c02d219858 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -1099,6 +1099,8 @@ public class GcsUtilTest {
                 GoogleJsonError error = new GoogleJsonError();
                 error.setCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
                 cb.onFailure(error, null);
+              } catch (GoogleJsonResponseException e) {
+                cb.onFailure(e.getDetails(), null);
               } catch (Exception e) {
                 System.out.println("Propagating exception as server error " + e);
                 e.printStackTrace();
@@ -1259,6 +1261,82 @@ public class GcsUtilTest {
                 StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS));
   }
 
+  @Test
+  public void testThrowRetentionPolicyNotMetErrorWhenUnequalChecksum() throws IOException {
+    // ./gradlew sdks:java:extensions:google-cloud-platform-core:test --tests
+    // org.apache.beam.sdk.extensions.gcp.util.GcsUtilTest.testHanRetentionPolicyNotMetError
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockGetRequest1 = Mockito.mock(Storage.Objects.Get.class);
+    Storage.Objects.Get mockGetRequest2 = Mockito.mock(Storage.Objects.Get.class);
+    Storage.Objects.Rewrite mockStorageRewrite = Mockito.mock(Storage.Objects.Rewrite.class);
+
+    // Gcs object to be used when checking the hash of the files during rewrite fail.
+    StorageObject srcObject = new StorageObject().setMd5Hash("a");
+    StorageObject destObject = new StorageObject().setMd5Hash("b");
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
+        .thenReturn(mockStorageRewrite);
+    when(mockStorageRewrite.execute())
+        .thenThrow(googleJsonResponseException(403, "retentionPolicyNotMet", "Too soon"));
+    when(mockStorageObjects.get("bucket", "s0")).thenReturn(mockGetRequest1);
+    when(mockGetRequest1.execute()).thenReturn(srcObject);
+    when(mockStorageObjects.get("bucket", "d0")).thenReturn(mockGetRequest2);
+    when(mockGetRequest2.execute()).thenReturn(destObject);
+
+    assertThrows(IOException.class, () -> gcsUtil.rename(makeStrings("s", 1), makeStrings("d", 1)));
+
+    verify(mockStorageRewrite, times(1)).execute();
+  }
+
+  @Test
+  public void testIgnoreRetentionPolicyNotMetErrorWhenEqualChecksum() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class);
+    Storage.Objects.Rewrite mockStorageRewrite1 = Mockito.mock(Storage.Objects.Rewrite.class);
+    Storage.Objects.Rewrite mockStorageRewrite2 = Mockito.mock(Storage.Objects.Rewrite.class);
+    Storage.Objects.Delete mockStorageDelete = Mockito.mock(Storage.Objects.Delete.class);
+
+    // Gcs object to be used when checking the hash of the files during rewrite fail.
+    StorageObject gcsObject = new StorageObject().setMd5Hash("a");
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    // First rewrite with retentionPolicyNotMet error.
+    when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
+        .thenReturn(mockStorageRewrite1);
+    when(mockStorageRewrite1.execute())
+        .thenThrow(googleJsonResponseException(403, "retentionPolicyNotMet", "Too soon"));
+    when(mockStorageObjects.get(any(), any())) // to access object hash during error handling
+        .thenReturn(mockGetRequest);
+    when(mockGetRequest.execute())
+        .thenReturn(gcsObject); // both source and destination will get the same hash
+    when(mockStorageObjects.delete("bucket", "s0")).thenReturn(mockStorageDelete);
+
+    // Second rewrite should not be affected.
+    when(mockStorageObjects.rewrite("bucket", "s1", "bucket", "d1", null))
+        .thenReturn(mockStorageRewrite2);
+    when(mockStorageRewrite2.execute()).thenReturn(new RewriteResponse().setDone(true));
+    when(mockStorageObjects.delete("bucket", "s1")).thenReturn(mockStorageDelete);
+
+    gcsUtil.rename(makeStrings("s", 2), makeStrings("d", 2));
+
+    verify(mockStorageRewrite1, times(1)).execute();
+    verify(mockStorageRewrite2, times(1)).execute();
+    verify(mockStorageDelete, times(2)).execute();
+  }
+
   @Test
   public void testMakeRemoveBatches() throws IOException {
     GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();