You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/12/16 00:49:21 UTC

[1/2] incubator-beam git commit: [BEAM-1153] GcsUtil: use non-batch API for single file size requests.

Repository: incubator-beam
Updated Branches:
  refs/heads/release-0.4.0-incubating f753422a3 -> bce0a0d50


[BEAM-1153] GcsUtil: use non-batch API for single file size requests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58601f8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58601f8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58601f8c

Branch: refs/heads/release-0.4.0-incubating
Commit: 58601f8c69b97dbdd9087b27c356c50bca7a1c8b
Parents: f753422
Author: Pei He <pe...@google.com>
Authored: Tue Dec 13 18:29:17 2016 -0800
Committer: Pei He <pe...@google.com>
Committed: Thu Dec 15 16:30:54 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 29 ++++++++-
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 65 +++++++++++++++++++-
 2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58601f8c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 2edb1d6..dcdba46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -267,7 +267,34 @@ public class GcsUtil {
    * if the resource does not exist.
    */
   public long fileSize(GcsPath path) throws IOException {
-    return fileSizes(ImmutableList.of(path)).get(0);
+    return fileSize(
+        path,
+        BACKOFF_FACTORY.backoff(),
+        Sleeper.DEFAULT);
+  }
+
+  /**
+   * Returns the file size from GCS or throws {@link FileNotFoundException}
+   * if the resource does not exist.
+   */
+  @VisibleForTesting
+  long fileSize(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+    Storage.Objects.Get getObject =
+            storageClient.objects().get(path.getBucket(), path.getObject());
+    try {
+      StorageObject object = ResilientOperation.retry(
+          ResilientOperation.getGoogleRequestCallable(getObject),
+          backoff,
+          RetryDeterminer.SOCKET_ERRORS,
+          IOException.class,
+          sleeper);
+      return object.getSize().longValue();
+    } catch (Exception e) {
+      if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
+        throw new FileNotFoundException(path.toString());
+      }
+      throw new IOException("Unable to get file size", e);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58601f8c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index c8ed402..6ca87f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -57,6 +57,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.net.SocketTimeoutException;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.file.AccessDeniedException;
@@ -320,7 +321,69 @@ public class GcsUtilTest {
   }
 
   @Test
-  public void testGetSizeBytesWhenFileNotFound() throws Exception {
+  public void testFileSizeNonBatch() throws Exception {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenReturn(
+            new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+    assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")));
+  }
+
+  @Test
+  public void testFileSizeWhenFileNotFoundNonBatch() throws Exception {
+    MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse();
+    notFoundResponse.setContent("");
+    notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+
+    MockHttpTransport mockTransport =
+            new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
+
+    thrown.expect(FileNotFoundException.class);
+    gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
+  }
+
+  @Test
+  public void testRetryFileSizeNonBatch() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+            .thenThrow(new SocketTimeoutException("SocketException"))
+            .thenThrow(new SocketTimeoutException("SocketException"))
+            .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+    assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"),
+            mockBackOff, new FastNanoClockAndSleeper()));
+    assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception {
     JsonFactory jsonFactory = new JacksonFactory();
 
     String contentBoundary = "batch_foobarbaz";


[2/2] incubator-beam git commit: This closes #1637

Posted by da...@apache.org.
This closes #1637


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bce0a0d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bce0a0d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bce0a0d5

Branch: refs/heads/release-0.4.0-incubating
Commit: bce0a0d50bdb4e560790c8292f031f5757a9bb8c
Parents: f753422 58601f8
Author: Davor Bonaci <da...@google.com>
Authored: Thu Dec 15 16:49:07 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Thu Dec 15 16:49:07 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 29 ++++++++-
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 65 +++++++++++++++++++-
 2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------