You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/31 16:48:29 UTC
[beam] branch master updated: Merge pull request #17683 from [BEAM-14475] add test cases to GcsUtil
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 06a7dce4811 Merge pull request #17683 from [BEAM-14475] add test cases to GcsUtil
06a7dce4811 is described below
commit 06a7dce48115b3be6b15c81d1b4e503ff1a7f4bd
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Tue May 31 12:48:21 2022 -0400
Merge pull request #17683 from [BEAM-14475] add test cases to GcsUtil
* [BEAM-14475] add test cases to GcsUtil
* [BEAM-14475] run spotless
* [BEAM-14475] add braces
---
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 12 +-
.../beam/sdk/extensions/gcp/util/GcsUtilTest.java | 152 +++++++++++++++++++++
2 files changed, 161 insertions(+), 3 deletions(-)
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 9e4e51680e1..82bbdb181c1 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -200,7 +200,8 @@ public class GcsUtil {
return GLOB_PREFIX.matcher(spec.getObject()).matches();
}
- private GcsUtil(
+ @VisibleForTesting
+ GcsUtil(
Storage storageClient,
HttpRequestInitializer httpRequestInitializer,
ExecutorService executorService,
@@ -220,7 +221,7 @@ public class GcsUtil {
.setGrpcEnabled(shouldUseGrpc)
.build();
googleCloudStorage =
- new GoogleCloudStorageImpl(googleCloudStorageOptions, storageClient, credentials);
+ createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
this.batchRequestSupplier =
() -> {
// Capture reference to this so that the most recent storageClient and initializer
@@ -557,7 +558,7 @@ public class GcsUtil {
GoogleCloudStorageOptions newGoogleCloudStorageOptions =
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
GoogleCloudStorage gcpStorage =
- new GoogleCloudStorageImpl(
+ createGoogleCloudStorage(
newGoogleCloudStorageOptions, this.storageClient, this.credentials);
StorageResourceId resourceId =
new StorageResourceId(
@@ -599,6 +600,11 @@ public class GcsUtil {
}
}
+ GoogleCloudStorage createGoogleCloudStorage(
+ GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
+ return new GoogleCloudStorageImpl(options, storage, credentials);
+ }
+
/** Returns whether the GCS bucket exists and is accessible. */
public boolean bucketAccessible(GcsPath path) throws IOException {
return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 7856ddd175b..f965b4d7900 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.gcp.util;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
@@ -39,6 +40,7 @@ import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest;
import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
@@ -58,6 +60,7 @@ import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.RewriteResponse;
import com.google.api.services.storage.model.StorageObject;
+import com.google.auth.Credentials;
import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
@@ -70,6 +73,7 @@ import java.io.InputStream;
import java.math.BigInteger;
import java.net.SocketTimeoutException;
import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
@@ -90,15 +94,18 @@ import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.BatchInterface;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.RewriteOp;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -1219,6 +1226,151 @@ public class GcsUtilTest {
assertEquals(501, results.size());
}
+ @Test
+ public void testGetObjects() throws IOException {
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class);
+ StorageObject object = new StorageObject();
+ when(mockGetRequest.execute()).thenReturn(object);
+ when(mockStorageObjects.get(any(), any())).thenReturn(mockGetRequest);
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+
+ List<StorageObjectOrIOException> results = gcsUtil.getObjects(makeGcsPaths("s", 1));
+
+ assertEquals(object, results.get(0).storageObject());
+ }
+
+ @Test
+ public void testGetObjectsWithException() throws IOException {
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class);
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get(any(), any())).thenReturn(mockGetRequest);
+ when(mockGetRequest.execute()).thenThrow(new RuntimeException("fakeException"));
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage("Error trying to get gs://bucket/s0");
+
+ List<StorageObjectOrIOException> results = gcsUtil.getObjects(makeGcsPaths("s", 1));
+
+ for (StorageObjectOrIOException result : results) {
+ if (null != result.ioException()) {
+ throw result.ioException();
+ }
+ }
+ }
+
+ @Test
+ public void testListObjectsException() throws IOException {
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
+ when(mockStorageObjects.list(any())).thenReturn(mockStorageList);
+ when(mockStorageList.execute()).thenThrow(new RuntimeException("FakeException"));
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage("Unable to match files in bucket testBucket");
+
+ gcsUtil.listObjects("testBucket", "prefix", null);
+ }
+
+ public static class GcsUtilMock extends GcsUtil {
+
+ public GoogleCloudStorage googleCloudStorage;
+
+ public static GcsUtilMock createMock(PipelineOptions options) {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
+ return new GcsUtilMock(
+ storageBuilder.build(),
+ storageBuilder.getHttpRequestInitializer(),
+ gcsOptions.getExecutorService(),
+ hasExperiment(options, "use_grpc_for_gcs"),
+ gcsOptions.getGcpCredential(),
+ gcsOptions.getGcsUploadBufferSizeBytes());
+ }
+
+ private GcsUtilMock(
+ Storage storageClient,
+ HttpRequestInitializer httpRequestInitializer,
+ ExecutorService executorService,
+ Boolean shouldUseGrpc,
+ Credentials credentials,
+ @Nullable Integer uploadBufferSizeBytes) {
+ super(
+ storageClient,
+ httpRequestInitializer,
+ executorService,
+ shouldUseGrpc,
+ credentials,
+ uploadBufferSizeBytes);
+ }
+
+ @Override
+ GoogleCloudStorage createGoogleCloudStorage(
+ GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
+ return googleCloudStorage;
+ }
+ }
+
+ @Test
+ public void testCreate() throws IOException {
+ GcsOptions gcsOptions = gcsOptionsWithTestCredential();
+
+ GcsUtilMock gcsUtil = GcsUtilMock.createMock(gcsOptions);
+
+ GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class);
+ WritableByteChannel mockChannel = Mockito.mock(WritableByteChannel.class);
+
+ gcsUtil.googleCloudStorage = mockStorage;
+
+ when(mockStorage.create(any(), any())).thenReturn(mockChannel);
+
+ GcsPath path = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
+ CreateOptions createOptions = CreateOptions.builder().build();
+
+ assertEquals(mockChannel, gcsUtil.create(path, createOptions));
+ }
+
+ @Test
+ public void testCreateWithException() throws IOException {
+ GcsOptions gcsOptions = gcsOptionsWithTestCredential();
+
+ GcsUtilMock gcsUtil = GcsUtilMock.createMock(gcsOptions);
+
+ GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class);
+
+ gcsUtil.googleCloudStorage = mockStorage;
+
+ when(mockStorage.create(any(), any())).thenThrow(new RuntimeException("testException"));
+
+ GcsPath path = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
+ CreateOptions createOptions = CreateOptions.builder().build();
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("testException");
+
+ gcsUtil.create(path, createOptions);
+ }
+
/** A helper to wrap a {@link GenericJson} object in a content stream. */
private static InputStream toStream(String content) throws IOException {
return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));