You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/31 16:48:29 UTC

[beam] branch master updated: Merge pull request #17683 from [BEAM-14475] add test cases to GcsUtil

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 06a7dce4811 Merge pull request #17683 from [BEAM-14475] add test cases to GcsUtil
06a7dce4811 is described below

commit 06a7dce48115b3be6b15c81d1b4e503ff1a7f4bd
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Tue May 31 12:48:21 2022 -0400

    Merge pull request #17683 from [BEAM-14475] add test cases to GcsUtil
    
    * [BEAM-14475] add test cases to GcsUtil
    
    * [BEAM-14475] run spotless
    
    * [BEAM-14475] add braces
---
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      |  12 +-
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  | 152 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 3 deletions(-)

diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 9e4e51680e1..82bbdb181c1 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -200,7 +200,8 @@ public class GcsUtil {
     return GLOB_PREFIX.matcher(spec.getObject()).matches();
   }
 
-  private GcsUtil(
+  @VisibleForTesting
+  GcsUtil(
       Storage storageClient,
       HttpRequestInitializer httpRequestInitializer,
       ExecutorService executorService,
@@ -220,7 +221,7 @@ public class GcsUtil {
             .setGrpcEnabled(shouldUseGrpc)
             .build();
     googleCloudStorage =
-        new GoogleCloudStorageImpl(googleCloudStorageOptions, storageClient, credentials);
+        createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
     this.batchRequestSupplier =
         () -> {
           // Capture reference to this so that the most recent storageClient and initializer
@@ -557,7 +558,7 @@ public class GcsUtil {
     GoogleCloudStorageOptions newGoogleCloudStorageOptions =
         googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
     GoogleCloudStorage gcpStorage =
-        new GoogleCloudStorageImpl(
+        createGoogleCloudStorage(
             newGoogleCloudStorageOptions, this.storageClient, this.credentials);
     StorageResourceId resourceId =
         new StorageResourceId(
@@ -599,6 +600,11 @@ public class GcsUtil {
     }
   }
 
+  GoogleCloudStorage createGoogleCloudStorage(
+      GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
+    return new GoogleCloudStorageImpl(options, storage, credentials);
+  }
+
   /** Returns whether the GCS bucket exists and is accessible. */
   public boolean bucketAccessible(GcsPath path) throws IOException {
     return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 7856ddd175b..f965b4d7900 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.gcp.util;
 
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
@@ -39,6 +40,7 @@ import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest;
 import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.http.HttpResponse;
 import com.google.api.client.http.HttpStatusCodes;
 import com.google.api.client.http.HttpTransport;
@@ -58,6 +60,7 @@ import com.google.api.services.storage.model.Bucket;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.RewriteResponse;
 import com.google.api.services.storage.model.StorageObject;
+import com.google.auth.Credentials;
 import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
@@ -70,6 +73,7 @@ import java.io.InputStream;
 import java.math.BigInteger;
 import java.net.SocketTimeoutException;
 import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
@@ -90,15 +94,18 @@ import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.BatchInterface;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.RewriteOp;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -1219,6 +1226,151 @@ public class GcsUtilTest {
     assertEquals(501, results.size());
   }
 
+  @Test
+  public void testGetObjects() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class);
+    StorageObject object = new StorageObject();
+    when(mockGetRequest.execute()).thenReturn(object);
+    when(mockStorageObjects.get(any(), any())).thenReturn(mockGetRequest);
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+
+    List<StorageObjectOrIOException> results = gcsUtil.getObjects(makeGcsPaths("s", 1));
+
+    assertEquals(object, results.get(0).storageObject());
+  }
+
+  @Test
+  public void testGetObjectsWithException() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class);
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get(any(), any())).thenReturn(mockGetRequest);
+    when(mockGetRequest.execute()).thenThrow(new RuntimeException("fakeException"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Error trying to get gs://bucket/s0");
+
+    List<StorageObjectOrIOException> results = gcsUtil.getObjects(makeGcsPaths("s", 1));
+
+    for (StorageObjectOrIOException result : results) {
+      if (null != result.ioException()) {
+        throw result.ioException();
+      }
+    }
+  }
+
+  @Test
+  public void testListObjectsException() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
+    when(mockStorageObjects.list(any())).thenReturn(mockStorageList);
+    when(mockStorageList.execute()).thenThrow(new RuntimeException("FakeException"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Unable to match files in bucket testBucket");
+
+    gcsUtil.listObjects("testBucket", "prefix", null);
+  }
+
+  public static class GcsUtilMock extends GcsUtil {
+
+    public GoogleCloudStorage googleCloudStorage;
+
+    public static GcsUtilMock createMock(PipelineOptions options) {
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
+      return new GcsUtilMock(
+          storageBuilder.build(),
+          storageBuilder.getHttpRequestInitializer(),
+          gcsOptions.getExecutorService(),
+          hasExperiment(options, "use_grpc_for_gcs"),
+          gcsOptions.getGcpCredential(),
+          gcsOptions.getGcsUploadBufferSizeBytes());
+    }
+
+    private GcsUtilMock(
+        Storage storageClient,
+        HttpRequestInitializer httpRequestInitializer,
+        ExecutorService executorService,
+        Boolean shouldUseGrpc,
+        Credentials credentials,
+        @Nullable Integer uploadBufferSizeBytes) {
+      super(
+          storageClient,
+          httpRequestInitializer,
+          executorService,
+          shouldUseGrpc,
+          credentials,
+          uploadBufferSizeBytes);
+    }
+
+    @Override
+    GoogleCloudStorage createGoogleCloudStorage(
+        GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
+      return googleCloudStorage;
+    }
+  }
+
+  @Test
+  public void testCreate() throws IOException {
+    GcsOptions gcsOptions = gcsOptionsWithTestCredential();
+
+    GcsUtilMock gcsUtil = GcsUtilMock.createMock(gcsOptions);
+
+    GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class);
+    WritableByteChannel mockChannel = Mockito.mock(WritableByteChannel.class);
+
+    gcsUtil.googleCloudStorage = mockStorage;
+
+    when(mockStorage.create(any(), any())).thenReturn(mockChannel);
+
+    GcsPath path = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
+    CreateOptions createOptions = CreateOptions.builder().build();
+
+    assertEquals(mockChannel, gcsUtil.create(path, createOptions));
+  }
+
+  @Test
+  public void testCreateWithException() throws IOException {
+    GcsOptions gcsOptions = gcsOptionsWithTestCredential();
+
+    GcsUtilMock gcsUtil = GcsUtilMock.createMock(gcsOptions);
+
+    GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class);
+
+    gcsUtil.googleCloudStorage = mockStorage;
+
+    when(mockStorage.create(any(), any())).thenThrow(new RuntimeException("testException"));
+
+    GcsPath path = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
+    CreateOptions createOptions = CreateOptions.builder().build();
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("testException");
+
+    gcsUtil.create(path, createOptions);
+  }
+
   /** A helper to wrap a {@link GenericJson} object in a content stream. */
   private static InputStream toStream(String content) throws IOException {
     return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));