You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/18 20:26:32 UTC
[1/2] beam git commit: GcsUtil: set timeout and retry for
BatchRequest with HttpRequestInitializer.
Repository: beam
Updated Branches:
refs/heads/master c53249de4 -> faa2277b5
GcsUtil: set timeout and retry for BatchRequest with HttpRequestInitializer.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97a76d94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97a76d94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97a76d94
Branch: refs/heads/master
Commit: 97a76d941776300ad3f77017869835327776f62e
Parents: c53249d
Author: Pei He <pe...@google.com>
Authored: Tue Dec 13 17:16:12 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 18 12:26:09 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 20 +++++--
.../org/apache/beam/sdk/util/GcsUtilTest.java | 56 ++++++++++++++++++++
2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/97a76d94/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index dcdba46..521673c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -25,6 +25,7 @@ import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
@@ -93,8 +94,10 @@ public class GcsUtil {
public GcsUtil create(PipelineOptions options) {
LOG.debug("Creating new GcsUtil");
GcsOptions gcsOptions = options.as(GcsOptions.class);
+ Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
return new GcsUtil(
- Transport.newStorageClient(gcsOptions).build(),
+ storageBuilder.build(),
+ storageBuilder.getHttpRequestInitializer(),
gcsOptions.getExecutorService(),
gcsOptions.getGcsUploadBufferSizeBytes());
}
@@ -132,6 +135,7 @@ public class GcsUtil {
/** Client for the GCS API. */
private Storage storageClient;
+ private final HttpRequestInitializer httpRequestInitializer;
/** Buffer size for GCS uploads (in bytes). */
@Nullable private final Integer uploadBufferSizeBytes;
@@ -156,9 +160,11 @@ public class GcsUtil {
private GcsUtil(
Storage storageClient,
+ HttpRequestInitializer httpRequestInitializer,
ExecutorService executorService,
@Nullable Integer uploadBufferSizeBytes) {
this.storageClient = storageClient;
+ this.httpRequestInitializer = httpRequestInitializer;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
this.executorService = executorService;
}
@@ -526,7 +532,7 @@ public class GcsUtil {
List<BatchRequest> batches = new LinkedList<>();
for (List<GcsPath> filesToGet :
Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
- BatchRequest batch = storageClient.batch();
+ BatchRequest batch = createBatchRequest();
for (GcsPath path : filesToGet) {
results.add(enqueueGetFileSize(path, batch));
}
@@ -548,14 +554,14 @@ public class GcsUtil {
destFilenames.size());
List<BatchRequest> batches = new LinkedList<>();
- BatchRequest batch = storageClient.batch();
+ BatchRequest batch = createBatchRequest();
for (int i = 0; i < srcFilenames.size(); i++) {
final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
enqueueCopy(sourcePath, destPath, batch);
if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
batches.add(batch);
- batch = storageClient.batch();
+ batch = createBatchRequest();
}
}
if (batch.size() > 0) {
@@ -568,7 +574,7 @@ public class GcsUtil {
List<BatchRequest> batches = new LinkedList<>();
for (List<String> filesToDelete :
Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) {
- BatchRequest batch = storageClient.batch();
+ BatchRequest batch = createBatchRequest();
for (String file : filesToDelete) {
enqueueDelete(GcsPath.fromUri(file), batch);
}
@@ -648,6 +654,10 @@ public class GcsUtil {
});
}
+ private BatchRequest createBatchRequest() {
+ return storageClient.batch(httpRequestInitializer);
+ }
+
/**
* Expands glob expressions to regular expressions.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/97a76d94/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 6ca87f9..d592761 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -38,6 +38,7 @@ import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.Json;
import com.google.api.client.json.JsonFactory;
@@ -55,11 +56,14 @@ import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
import com.google.cloud.hadoop.util.ClientRequestHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.math.BigInteger;
import java.net.SocketTimeoutException;
import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -415,6 +419,51 @@ public class GcsUtilTest {
}
@Test
+ public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception {
+ JsonFactory jsonFactory = new JacksonFactory();
+
+ String contentBoundary = "batch_foobarbaz";
+
+ GenericJson error = new GenericJson()
+ .set("error", new GenericJson().set("code", 404));
+ error.setFactory(jsonFactory);
+
+ String content = contentBoundary + "\n"
+ + "Content-Type: application/http\n"
+ + "\n"
+ + "HTTP/1.1 404 Not Found\n"
+ + "Content-Length: 105\n"
+ + "\n"
+ + error.toString();
+ thrown.expect(FileNotFoundException.class);
+
+ final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class);
+ when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary);
+
+ // 429: Too many requests, then 200: OK.
+ when(mockResponse.getStatusCode()).thenReturn(429, 200);
+ when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content));
+
+ // A mock transport that lets us mock the API responses.
+ MockHttpTransport mockTransport =
+ new MockHttpTransport.Builder()
+ .setLowLevelHttpRequest(
+ new MockLowLevelHttpRequest() {
+ @Override
+ public LowLevelHttpResponse execute() throws IOException {
+ return mockResponse;
+ }
+ })
+ .build();
+
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ gcsUtil.setStorageClient(
+ new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
+ gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
+ }
+
+ @Test
public void testCreateBucket() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
@@ -723,4 +772,11 @@ public class GcsUtilTest {
assertThat(sumBatchSizes(batches), equalTo(501));
assertEquals(501, results.size());
}
+
+ /**
+ * A helper to wrap a {@link GenericJson} object in a content stream.
+ */
+ private static InputStream toStream(String content) throws IOException {
+ return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
+ }
}
[2/2] beam git commit: This closes #1608
Posted by dh...@apache.org.
This closes #1608
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/faa2277b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/faa2277b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/faa2277b
Branch: refs/heads/master
Commit: faa2277b5d85c848ec935a4a0a1d4ba75291a9bc
Parents: c53249d 97a76d9
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 18 12:26:19 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 18 12:26:19 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 20 +++++--
.../org/apache/beam/sdk/util/GcsUtilTest.java | 56 ++++++++++++++++++++
2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------