You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/08/05 18:49:34 UTC
[beam] branch master updated: Add support when writing to locked buckets by handling retentionPolicyNotMet error (#22138)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d0e523b8e9d Add support when writing to locked buckets by handling retentionPolicyNotMet error (#22138)
d0e523b8e9d is described below
commit d0e523b8e9da3a31bc3eb4dc706895638ec86b96
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Fri Aug 5 14:49:28 2022 -0400
Add support when writing to locked buckets by handling retentionPolicyNotMet error (#22138)
* handling retentionPolicyNotMet error
* draft test
* 1) deleting source when appropriate. 2) test retention error files are identical
* added test to verify valid retentionPolicyNotMet errors are still being thrown
* checking for nullness
---
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 28 ++++++++
.../beam/sdk/extensions/gcp/util/GcsUtilTest.java | 78 ++++++++++++++++++++++
2 files changed, 106 insertions(+)
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 30ce9dbd97e..c5d92a1a35c 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -57,6 +57,7 @@ import java.nio.channels.WritableByteChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -915,6 +916,33 @@ public class GcsUtil {
} else {
throw new FileNotFoundException(e.getMessage());
}
+ } else if (e.getCode() == 403
+ && e.getErrors().size() == 1
+ && e.getErrors().get(0).getReason().equals("retentionPolicyNotMet")) {
+ List<StorageObjectOrIOException> srcAndDestObjects = getObjects(Arrays.asList(from, to));
+ String srcHash = srcAndDestObjects.get(0).storageObject().getMd5Hash();
+ String destHash = srcAndDestObjects.get(1).storageObject().getMd5Hash();
+ if (srcHash != null && srcHash.equals(destHash)) {
+ // Source and destination are identical. Treat this as a successful rewrite
+ LOG.warn(
+ "Caught retentionPolicyNotMet error while rewriting to a bucket with retention "
+ + "policy. Skipping because destination {} and source {} are considered identical "
+ + "because their MD5 Hashes are equal.",
+ getFrom(),
+ getTo());
+
+ if (deleteSource) {
+ readyToEnqueue = true;
+ performDelete = true;
+ } else {
+ readyToEnqueue = false;
+ }
+ lastError = null;
+ } else {
+ // User is attempting to write to a file that hasn't met its retention policy yet.
+ // Not a transient error so likely will not be fixed by a retry
+ throw new IOException(e.getMessage());
+ }
} else {
lastError = e;
readyToEnqueue = true;
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 7d190536800..8c02d219858 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -1099,6 +1099,8 @@ public class GcsUtilTest {
GoogleJsonError error = new GoogleJsonError();
error.setCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
cb.onFailure(error, null);
+ } catch (GoogleJsonResponseException e) {
+ cb.onFailure(e.getDetails(), null);
} catch (Exception e) {
System.out.println("Propagating exception as server error " + e);
e.printStackTrace();
@@ -1259,6 +1261,82 @@ public class GcsUtilTest {
StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS));
}
+ @Test
+ public void testThrowRetentionPolicyNotMetErrorWhenUnequalChecksum() throws IOException {
+ // ./gradlew sdks:java:extensions:google-cloud-platform-core:test --tests
+ // org.apache.beam.sdk.extensions.gcp.util.GcsUtilTest.testHanRetentionPolicyNotMetError
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Get mockGetRequest1 = Mockito.mock(Storage.Objects.Get.class);
+ Storage.Objects.Get mockGetRequest2 = Mockito.mock(Storage.Objects.Get.class);
+ Storage.Objects.Rewrite mockStorageRewrite = Mockito.mock(Storage.Objects.Rewrite.class);
+
+ // Gcs object to be used when checking the hash of the files during rewrite fail.
+ StorageObject srcObject = new StorageObject().setMd5Hash("a");
+ StorageObject destObject = new StorageObject().setMd5Hash("b");
+
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
+ .thenReturn(mockStorageRewrite);
+ when(mockStorageRewrite.execute())
+ .thenThrow(googleJsonResponseException(403, "retentionPolicyNotMet", "Too soon"));
+ when(mockStorageObjects.get("bucket", "s0")).thenReturn(mockGetRequest1);
+ when(mockGetRequest1.execute()).thenReturn(srcObject);
+ when(mockStorageObjects.get("bucket", "d0")).thenReturn(mockGetRequest2);
+ when(mockGetRequest2.execute()).thenReturn(destObject);
+
+ assertThrows(IOException.class, () -> gcsUtil.rename(makeStrings("s", 1), makeStrings("d", 1)));
+
+ verify(mockStorageRewrite, times(1)).execute();
+ }
+
+ @Test
+ public void testIgnoreRetentionPolicyNotMetErrorWhenEqualChecksum() throws IOException {
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class);
+ Storage.Objects.Rewrite mockStorageRewrite1 = Mockito.mock(Storage.Objects.Rewrite.class);
+ Storage.Objects.Rewrite mockStorageRewrite2 = Mockito.mock(Storage.Objects.Rewrite.class);
+ Storage.Objects.Delete mockStorageDelete = Mockito.mock(Storage.Objects.Delete.class);
+
+ // Gcs object to be used when checking the hash of the files during rewrite fail.
+ StorageObject gcsObject = new StorageObject().setMd5Hash("a");
+
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ // First rewrite with retentionPolicyNotMet error.
+ when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
+ .thenReturn(mockStorageRewrite1);
+ when(mockStorageRewrite1.execute())
+ .thenThrow(googleJsonResponseException(403, "retentionPolicyNotMet", "Too soon"));
+ when(mockStorageObjects.get(any(), any())) // to access object hash during error handling
+ .thenReturn(mockGetRequest);
+ when(mockGetRequest.execute())
+ .thenReturn(gcsObject); // both source and destination will get the same hash
+ when(mockStorageObjects.delete("bucket", "s0")).thenReturn(mockStorageDelete);
+
+ // Second rewrite should not be affected.
+ when(mockStorageObjects.rewrite("bucket", "s1", "bucket", "d1", null))
+ .thenReturn(mockStorageRewrite2);
+ when(mockStorageRewrite2.execute()).thenReturn(new RewriteResponse().setDone(true));
+ when(mockStorageObjects.delete("bucket", "s1")).thenReturn(mockStorageDelete);
+
+ gcsUtil.rename(makeStrings("s", 2), makeStrings("d", 2));
+
+ verify(mockStorageRewrite1, times(1)).execute();
+ verify(mockStorageRewrite2, times(1)).execute();
+ verify(mockStorageDelete, times(2)).execute();
+ }
+
@Test
public void testMakeRemoveBatches() throws IOException {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();