You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/08/17 22:39:48 UTC
[beam] branch master updated: Don't use batch interface for single object operations (#22432)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aa574f5e6e7 Don't use batch interface for single object operations (#22432)
aa574f5e6e7 is described below
commit aa574f5e6e78b311744793701e9f6b6abf04d5a9
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Wed Aug 17 18:39:42 2022 -0400
Don't use batch interface for single object operations (#22432)
---
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 18 ++++-
.../beam/sdk/extensions/gcp/util/GcsUtilTest.java | 86 +++++++++++++++++++++-
2 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index c5d92a1a35c..2acec4f2386 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -361,6 +361,22 @@ public class GcsUtil {
* GcsPath GcsPaths}.
*/
public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) throws IOException {
+ if (gcsPaths.isEmpty()) {
+ return ImmutableList.of();
+ } else if (gcsPaths.size() == 1) {
+ GcsPath path = gcsPaths.get(0);
+ try {
+ StorageObject object = getObject(path);
+ return ImmutableList.of(StorageObjectOrIOException.create(object));
+ } catch (IOException e) {
+ return ImmutableList.of(StorageObjectOrIOException.create(e));
+ } catch (Exception e) {
+ IOException ioException =
+ new IOException(String.format("Error trying to get %s: %s", path, e));
+ return ImmutableList.of(StorageObjectOrIOException.create(ioException));
+ }
+ }
+
List<StorageObjectOrIOException[]> results = new ArrayList<>();
executeBatches(makeGetBatches(gcsPaths, results));
ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder();
@@ -749,7 +765,7 @@ public class GcsUtil {
List<CompletionStage<Void>> futures = new ArrayList<>();
for (final BatchInterface batch : batches) {
- futures.add(MoreFutures.runAsync(() -> batch.execute(), executor));
+ futures.add(MoreFutures.runAsync(batch::execute, executor));
}
try {
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 8c02d219858..33a87c6d0ee 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -476,6 +476,16 @@ public class GcsUtilTest {
String content =
contentBoundaryLine
+ + "\n"
+ + "Content-Type: application/http\n"
+ + "\n"
+ + "HTTP/1.1 404 Not Found\n"
+ + "Content-Length: -1\n"
+ + "\n"
+ + error.toString()
+ + "\n"
+ + "\n"
+ + contentBoundaryLine
+ "\n"
+ "Content-Type: application/http\n"
+ "\n"
@@ -499,6 +509,27 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+ gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
+ gcsUtil.fileSizes(
+ ImmutableList.of(
+ GcsPath.fromComponents("testbucket", "testobject"),
+ GcsPath.fromComponents("testbucket", "testobject2")));
+ }
+
+ @Test
+ public void testGetSizeBytesWhenFileNotFoundNoBatch() throws Exception {
+ thrown.expect(FileNotFoundException.class);
+ MockLowLevelHttpResponse notFoundResponse =
+ new MockLowLevelHttpResponse()
+ .setContentType("text/plain")
+ .setContent("error")
+ .setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+
+ MockHttpTransport mockTransport =
+ new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
}
@@ -525,8 +556,18 @@ public class GcsUtilTest {
+ error.toString()
+ "\n"
+ "\n"
- + endOfContentBoundaryLine
- + "\n";
+ + contentBoundaryLine
+ + "\n"
+ + "Content-Type: application/http\n"
+ + "\n"
+ + "HTTP/1.1 404 Not Found\n"
+ + "Content-Length: -1\n"
+ + "\n"
+ + error.toString()
+ + "\n"
+ + "\n"
+ + endOfContentBoundaryLine;
+
thrown.expect(FileNotFoundException.class);
final LowLevelHttpResponse[] mockResponses =
@@ -559,6 +600,47 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+ gcsUtil.setStorageClient(
+ new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
+ gcsUtil.fileSizes(
+ ImmutableList.of(
+ GcsPath.fromComponents("testbucket", "testobject"),
+ GcsPath.fromComponents("testbucket", "testobject2")));
+ }
+
+ @Test
+ public void testGetSizeBytesWhenFileNotFoundNoBatchRetry() throws Exception {
+ thrown.expect(FileNotFoundException.class);
+
+ final LowLevelHttpResponse[] mockResponses =
+ new LowLevelHttpResponse[] {
+ Mockito.mock(LowLevelHttpResponse.class), Mockito.mock(LowLevelHttpResponse.class),
+ };
+ when(mockResponses[0].getContentType()).thenReturn("text/plain");
+ when(mockResponses[1].getContentType()).thenReturn("text/plain");
+
+ // 429: Too many requests, then 200: OK.
+ when(mockResponses[0].getStatusCode()).thenReturn(429);
+ when(mockResponses[1].getStatusCode()).thenReturn(404);
+ when(mockResponses[0].getContent()).thenReturn(toStream("error"));
+ when(mockResponses[1].getContent()).thenReturn(toStream("error"));
+
+ // A mock transport that lets us mock the API responses.
+ MockHttpTransport mockTransport =
+ new MockHttpTransport.Builder()
+ .setLowLevelHttpRequest(
+ new MockLowLevelHttpRequest() {
+ int index = 0;
+
+ @Override
+ public LowLevelHttpResponse execute() throws IOException {
+ return mockResponses[index++];
+ }
+ })
+ .build();
+
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
gcsUtil.setStorageClient(
new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));