You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/08/17 22:39:48 UTC

[beam] branch master updated: Don't use batch interface for single object operations (#22432)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aa574f5e6e7 Don't use batch interface for single object operations (#22432)
aa574f5e6e7 is described below

commit aa574f5e6e78b311744793701e9f6b6abf04d5a9
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Wed Aug 17 18:39:42 2022 -0400

    Don't use batch interface for single object operations (#22432)
---
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      | 18 ++++-
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  | 86 +++++++++++++++++++++-
 2 files changed, 101 insertions(+), 3 deletions(-)

diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index c5d92a1a35c..2acec4f2386 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -361,6 +361,22 @@ public class GcsUtil {
    * GcsPath GcsPaths}.
    */
   public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) throws IOException {
+    if (gcsPaths.isEmpty()) {
+      return ImmutableList.of();
+    } else if (gcsPaths.size() == 1) {
+      GcsPath path = gcsPaths.get(0);
+      try {
+        StorageObject object = getObject(path);
+        return ImmutableList.of(StorageObjectOrIOException.create(object));
+      } catch (IOException e) {
+        return ImmutableList.of(StorageObjectOrIOException.create(e));
+      } catch (Exception e) {
+        IOException ioException =
+            new IOException(String.format("Error trying to get %s: %s", path, e));
+        return ImmutableList.of(StorageObjectOrIOException.create(ioException));
+      }
+    }
+
     List<StorageObjectOrIOException[]> results = new ArrayList<>();
     executeBatches(makeGetBatches(gcsPaths, results));
     ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder();
@@ -749,7 +765,7 @@ public class GcsUtil {
 
     List<CompletionStage<Void>> futures = new ArrayList<>();
     for (final BatchInterface batch : batches) {
-      futures.add(MoreFutures.runAsync(() -> batch.execute(), executor));
+      futures.add(MoreFutures.runAsync(batch::execute, executor));
     }
 
     try {
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 8c02d219858..33a87c6d0ee 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -476,6 +476,16 @@ public class GcsUtilTest {
 
     String content =
         contentBoundaryLine
+            + "\n"
+            + "Content-Type: application/http\n"
+            + "\n"
+            + "HTTP/1.1 404 Not Found\n"
+            + "Content-Length: -1\n"
+            + "\n"
+            + error.toString()
+            + "\n"
+            + "\n"
+            + contentBoundaryLine
             + "\n"
             + "Content-Type: application/http\n"
             + "\n"
@@ -499,6 +509,27 @@ public class GcsUtilTest {
 
     GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
 
+    gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
+    gcsUtil.fileSizes(
+        ImmutableList.of(
+            GcsPath.fromComponents("testbucket", "testobject"),
+            GcsPath.fromComponents("testbucket", "testobject2")));
+  }
+
+  @Test
+  public void testGetSizeBytesWhenFileNotFoundNoBatch() throws Exception {
+    thrown.expect(FileNotFoundException.class);
+    MockLowLevelHttpResponse notFoundResponse =
+        new MockLowLevelHttpResponse()
+            .setContentType("text/plain")
+            .setContent("error")
+            .setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+
+    MockHttpTransport mockTransport =
+        new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
     gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
     gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
   }
@@ -525,8 +556,18 @@ public class GcsUtilTest {
             + error.toString()
             + "\n"
             + "\n"
-            + endOfContentBoundaryLine
-            + "\n";
+            + contentBoundaryLine
+            + "\n"
+            + "Content-Type: application/http\n"
+            + "\n"
+            + "HTTP/1.1 404 Not Found\n"
+            + "Content-Length: -1\n"
+            + "\n"
+            + error.toString()
+            + "\n"
+            + "\n"
+            + endOfContentBoundaryLine;
+
     thrown.expect(FileNotFoundException.class);
 
     final LowLevelHttpResponse[] mockResponses =
@@ -559,6 +600,47 @@ public class GcsUtilTest {
 
     GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
 
+    gcsUtil.setStorageClient(
+        new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
+    gcsUtil.fileSizes(
+        ImmutableList.of(
+            GcsPath.fromComponents("testbucket", "testobject"),
+            GcsPath.fromComponents("testbucket", "testobject2")));
+  }
+
+  @Test
+  public void testGetSizeBytesWhenFileNotFoundNoBatchRetry() throws Exception {
+    thrown.expect(FileNotFoundException.class);
+
+    final LowLevelHttpResponse[] mockResponses =
+        new LowLevelHttpResponse[] {
+          Mockito.mock(LowLevelHttpResponse.class), Mockito.mock(LowLevelHttpResponse.class),
+        };
+    when(mockResponses[0].getContentType()).thenReturn("text/plain");
+    when(mockResponses[1].getContentType()).thenReturn("text/plain");
+
+    // 429: Too many requests, then 200: OK.
+    when(mockResponses[0].getStatusCode()).thenReturn(429);
+    when(mockResponses[1].getStatusCode()).thenReturn(404);
+    when(mockResponses[0].getContent()).thenReturn(toStream("error"));
+    when(mockResponses[1].getContent()).thenReturn(toStream("error"));
+
+    // A mock transport that lets us mock the API responses.
+    MockHttpTransport mockTransport =
+        new MockHttpTransport.Builder()
+            .setLowLevelHttpRequest(
+                new MockLowLevelHttpRequest() {
+                  int index = 0;
+
+                  @Override
+                  public LowLevelHttpResponse execute() throws IOException {
+                    return mockResponses[index++];
+                  }
+                })
+            .build();
+
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
     gcsUtil.setStorageClient(
         new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
     gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));