You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/18 20:26:32 UTC

[1/2] beam git commit: GcsUtil: set timeout and retry for BatchRequest with HttpRequestInitializer.

Repository: beam
Updated Branches:
  refs/heads/master c53249de4 -> faa2277b5


GcsUtil: set timeout and retry for BatchRequest with HttpRequestInitializer.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97a76d94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97a76d94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97a76d94

Branch: refs/heads/master
Commit: 97a76d941776300ad3f77017869835327776f62e
Parents: c53249d
Author: Pei He <pe...@google.com>
Authored: Tue Dec 13 17:16:12 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 18 12:26:09 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 20 +++++--
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 56 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97a76d94/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index dcdba46..521673c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -25,6 +25,7 @@ import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
 import com.google.api.client.googleapis.json.GoogleJsonError;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.storage.Storage;
@@ -93,8 +94,10 @@ public class GcsUtil {
     public GcsUtil create(PipelineOptions options) {
       LOG.debug("Creating new GcsUtil");
       GcsOptions gcsOptions = options.as(GcsOptions.class);
+      Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
       return new GcsUtil(
-          Transport.newStorageClient(gcsOptions).build(),
+          storageBuilder.build(),
+          storageBuilder.getHttpRequestInitializer(),
           gcsOptions.getExecutorService(),
           gcsOptions.getGcsUploadBufferSizeBytes());
     }
@@ -132,6 +135,7 @@ public class GcsUtil {
 
   /** Client for the GCS API. */
   private Storage storageClient;
+  private final HttpRequestInitializer httpRequestInitializer;
   /** Buffer size for GCS uploads (in bytes). */
   @Nullable private final Integer uploadBufferSizeBytes;
 
@@ -156,9 +160,11 @@ public class GcsUtil {
 
   private GcsUtil(
       Storage storageClient,
+      HttpRequestInitializer httpRequestInitializer,
       ExecutorService executorService,
       @Nullable Integer uploadBufferSizeBytes) {
     this.storageClient = storageClient;
+    this.httpRequestInitializer = httpRequestInitializer;
     this.uploadBufferSizeBytes = uploadBufferSizeBytes;
     this.executorService = executorService;
   }
@@ -526,7 +532,7 @@ public class GcsUtil {
     List<BatchRequest> batches = new LinkedList<>();
     for (List<GcsPath> filesToGet :
         Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
-      BatchRequest batch = storageClient.batch();
+      BatchRequest batch = createBatchRequest();
       for (GcsPath path : filesToGet) {
         results.add(enqueueGetFileSize(path, batch));
       }
@@ -548,14 +554,14 @@ public class GcsUtil {
         destFilenames.size());
 
     List<BatchRequest> batches = new LinkedList<>();
-    BatchRequest batch = storageClient.batch();
+    BatchRequest batch = createBatchRequest();
     for (int i = 0; i < srcFilenames.size(); i++) {
       final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
       final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
       enqueueCopy(sourcePath, destPath, batch);
       if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
         batches.add(batch);
-        batch = storageClient.batch();
+        batch = createBatchRequest();
       }
     }
     if (batch.size() > 0) {
@@ -568,7 +574,7 @@ public class GcsUtil {
     List<BatchRequest> batches = new LinkedList<>();
     for (List<String> filesToDelete :
         Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) {
-      BatchRequest batch = storageClient.batch();
+      BatchRequest batch = createBatchRequest();
       for (String file : filesToDelete) {
         enqueueDelete(GcsPath.fromUri(file), batch);
       }
@@ -648,6 +654,10 @@ public class GcsUtil {
     });
   }
 
+  private BatchRequest createBatchRequest() {
+    return storageClient.batch(httpRequestInitializer);
+  }
+
   /**
    * Expands glob expressions to regular expressions.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/97a76d94/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 6ca87f9..d592761 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -38,6 +38,7 @@ import com.google.api.client.http.HttpResponse;
 import com.google.api.client.http.HttpStatusCodes;
 import com.google.api.client.http.HttpTransport;
 import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
 import com.google.api.client.json.GenericJson;
 import com.google.api.client.json.Json;
 import com.google.api.client.json.JsonFactory;
@@ -55,11 +56,14 @@ import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
 import com.google.cloud.hadoop.util.ClientRequestHelper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.math.BigInteger;
 import java.net.SocketTimeoutException;
 import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -415,6 +419,51 @@ public class GcsUtilTest {
   }
 
   @Test
+  public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception {
+    JsonFactory jsonFactory = new JacksonFactory();
+
+    String contentBoundary = "batch_foobarbaz";
+
+    GenericJson error = new GenericJson()
+        .set("error", new GenericJson().set("code", 404));
+    error.setFactory(jsonFactory);
+
+    String content = contentBoundary + "\n"
+        + "Content-Type: application/http\n"
+        + "\n"
+        + "HTTP/1.1 404 Not Found\n"
+        + "Content-Length: 105\n"
+        + "\n"
+        + error.toString();
+    thrown.expect(FileNotFoundException.class);
+
+    final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class);
+    when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary);
+
+    // 429: Too many requests, then 200: OK.
+    when(mockResponse.getStatusCode()).thenReturn(429, 200);
+    when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content));
+
+    // A mock transport that lets us mock the API responses.
+    MockHttpTransport mockTransport =
+        new MockHttpTransport.Builder()
+            .setLowLevelHttpRequest(
+                new MockLowLevelHttpRequest() {
+                  @Override
+                  public LowLevelHttpResponse execute() throws IOException {
+                    return mockResponse;
+                  }
+                })
+            .build();
+
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+        gcsUtil.setStorageClient(
+        new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
+    gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
+  }
+
+  @Test
   public void testCreateBucket() throws IOException {
     GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
     GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
@@ -723,4 +772,11 @@ public class GcsUtilTest {
     assertThat(sumBatchSizes(batches), equalTo(501));
     assertEquals(501, results.size());
   }
+
+  /**
+   * A helper to wrap a {@link GenericJson} object in a content stream.
+   */
+  private static InputStream toStream(String content) throws IOException {
+    return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
+  }
 }


[2/2] beam git commit: This closes #1608

Posted by dh...@apache.org.
This closes #1608


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/faa2277b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/faa2277b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/faa2277b

Branch: refs/heads/master
Commit: faa2277b5d85c848ec935a4a0a1d4ba75291a9bc
Parents: c53249d 97a76d9
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jan 18 12:26:19 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jan 18 12:26:19 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 20 +++++--
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 56 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------