You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/12/16 00:49:21 UTC
[1/2] incubator-beam git commit: [BEAM-1153] GcsUtil: use non-batch
API for single file size requests.
Repository: incubator-beam
Updated Branches:
refs/heads/release-0.4.0-incubating f753422a3 -> bce0a0d50
[BEAM-1153] GcsUtil: use non-batch API for single file size requests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58601f8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58601f8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58601f8c
Branch: refs/heads/release-0.4.0-incubating
Commit: 58601f8c69b97dbdd9087b27c356c50bca7a1c8b
Parents: f753422
Author: Pei He <pe...@google.com>
Authored: Tue Dec 13 18:29:17 2016 -0800
Committer: Pei He <pe...@google.com>
Committed: Thu Dec 15 16:30:54 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 29 ++++++++-
.../org/apache/beam/sdk/util/GcsUtilTest.java | 65 +++++++++++++++++++-
2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58601f8c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 2edb1d6..dcdba46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -267,7 +267,34 @@ public class GcsUtil {
* if the resource does not exist.
*/
public long fileSize(GcsPath path) throws IOException {
- return fileSizes(ImmutableList.of(path)).get(0);
+ return fileSize(
+ path,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
+ }
+
+ /**
+ * Returns the file size from GCS or throws {@link FileNotFoundException}
+ * if the resource does not exist.
+ */
+ @VisibleForTesting
+ long fileSize(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+ Storage.Objects.Get getObject =
+ storageClient.objects().get(path.getBucket(), path.getObject());
+ try {
+ StorageObject object = ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(getObject),
+ backoff,
+ RetryDeterminer.SOCKET_ERRORS,
+ IOException.class,
+ sleeper);
+ return object.getSize().longValue();
+ } catch (Exception e) {
+ if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
+ throw new FileNotFoundException(path.toString());
+ }
+ throw new IOException("Unable to get file size", e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58601f8c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index c8ed402..6ca87f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -57,6 +57,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.math.BigInteger;
import java.net.SocketTimeoutException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.AccessDeniedException;
@@ -320,7 +321,69 @@ public class GcsUtilTest {
}
@Test
- public void testGetSizeBytesWhenFileNotFound() throws Exception {
+ public void testFileSizeNonBatch() throws Exception {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
+ when(mockStorageGet.execute()).thenReturn(
+ new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+ assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")));
+ }
+
+ @Test
+ public void testFileSizeWhenFileNotFoundNonBatch() throws Exception {
+ MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse();
+ notFoundResponse.setContent("");
+ notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+
+ MockHttpTransport mockTransport =
+ new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
+
+ thrown.expect(FileNotFoundException.class);
+ gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
+ }
+
+ @Test
+ public void testRetryFileSizeNonBatch() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+ BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
+
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
+ when(mockStorageGet.execute())
+ .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+ assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"),
+ mockBackOff, new FastNanoClockAndSleeper()));
+ assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception {
JsonFactory jsonFactory = new JacksonFactory();
String contentBoundary = "batch_foobarbaz";
[2/2] incubator-beam git commit: This closes #1637
Posted by da...@apache.org.
This closes #1637
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bce0a0d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bce0a0d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bce0a0d5
Branch: refs/heads/release-0.4.0-incubating
Commit: bce0a0d50bdb4e560790c8292f031f5757a9bb8c
Parents: f753422 58601f8
Author: Davor Bonaci <da...@google.com>
Authored: Thu Dec 15 16:49:07 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Thu Dec 15 16:49:07 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 29 ++++++++-
.../org/apache/beam/sdk/util/GcsUtilTest.java | 65 +++++++++++++++++++-
2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------