You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/23 16:16:45 UTC
[1/2] incubator-beam git commit: Closes #826
Repository: incubator-beam
Updated Branches:
refs/heads/master 064796dcc -> a8b2d6cc9
Closes #826
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8b2d6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8b2d6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8b2d6cc
Branch: refs/heads/master
Commit: a8b2d6cc9fd1c2bf9757fc8efd8c30b5df6dcbb9
Parents: 064796d f08f21c
Author: Dan Halperin <dh...@google.com>
Authored: Tue Aug 23 09:13:40 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 23 09:13:40 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 241 +++++++++----------
.../org/apache/beam/sdk/util/GcsUtilTest.java | 69 ++++++
2 files changed, 178 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: FileBasedSink: improve parallelism
in GCS copy/remove
Posted by dh...@apache.org.
FileBasedSink: improve parallelism in GCS copy/remove
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f08f21cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f08f21cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f08f21cd
Branch: refs/heads/master
Commit: f08f21cdf4c067745a10b31a6481ed470f97dadc
Parents: 064796d
Author: Dan Halperin <dh...@google.com>
Authored: Sun Aug 14 23:08:21 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 23 09:13:40 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 241 +++++++++----------
.../org/apache/beam/sdk/util/GcsUtilTest.java | 69 ++++++
2 files changed, 178 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f08f21cd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 4e9ee6e..06685e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -30,11 +30,9 @@ import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpHeaders;
-import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.StorageRequest;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
@@ -47,6 +45,11 @@ import com.google.cloud.hadoop.util.ResilientOperation;
import com.google.cloud.hadoop.util.RetryDeterminer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,12 +62,16 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
/**
* Provides operations on GCS.
@@ -110,7 +117,11 @@ public class GcsUtil {
/**
* Maximum number of requests permitted in a GCS batch request.
*/
- private static final int MAX_REQUESTS_PER_BATCH = 1000;
+ private static final int MAX_REQUESTS_PER_BATCH = 100;
+ /**
+ * Maximum number of concurrent batches of requests executing on GCS.
+ */
+ private static final int MAX_CONCURRENT_BATCHES = 256;
/////////////////////////////////////////////////////////////////////////////
@@ -125,7 +136,6 @@ public class GcsUtil {
// Exposed for testing.
final ExecutorService executorService;
- private final BatchHelper batchHelper;
/**
* Returns true if the given GCS pattern is supported otherwise fails with an
* exception.
@@ -145,8 +155,6 @@ public class GcsUtil {
this.storageClient = storageClient;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
this.executorService = executorService;
- this.batchHelper = new BatchHelper(
- storageClient.getRequestFactory().getInitializer(), storageClient, MAX_REQUESTS_PER_BATCH);
}
// Use this only for testing purposes.
@@ -372,154 +380,123 @@ public class GcsUtil {
}
}
+ private static void executeBatches(List<BatchRequest> batches) throws IOException {
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+ MoreExecutors.getExitingExecutorService(
+ new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>())));
+
+ List<ListenableFuture<Void>> futures = new LinkedList<>();
+ for (final BatchRequest batch : batches) {
+ futures.add(executor.submit(new Callable<Void>() {
+ public Void call() throws IOException {
+ batch.execute();
+ return null;
+ }
+ }));
+ }
+
+ try {
+ Futures.allAsList(futures).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while executing batch GCS request", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Error executing batch GCS request", e);
+ } finally {
+ executor.shutdown();
+ }
+ }
+
public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+ executeBatches(makeCopyBatches(srcFilenames, destFilenames));
+ }
+
+ List<BatchRequest> makeCopyBatches(List<String> srcFilenames, List<String> destFilenames)
+ throws IOException {
checkArgument(
srcFilenames.size() == destFilenames.size(),
"Number of source files %s must equal number of destination files %s",
srcFilenames.size(),
destFilenames.size());
+
+ List<BatchRequest> batches = new LinkedList<>();
+ BatchRequest batch = storageClient.batch();
for (int i = 0; i < srcFilenames.size(); i++) {
final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
- LOG.debug("Copying {} to {}", sourcePath, destPath);
- Storage.Objects.Copy copyObject = storageClient.objects().copy(sourcePath.getBucket(),
- sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null);
- batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() {
- @Override
- public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
- LOG.debug("Successfully copied {} to {}", sourcePath, destPath);
- }
-
- @Override
- public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
- // Do nothing on item not found.
- if (!errorExtractor.itemNotFound(e)) {
- throw new IOException(e.toString());
- }
- LOG.debug("{} does not exist.", sourcePath);
- }
- });
+ enqueueCopy(sourcePath, destPath, batch);
+ if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
+ batches.add(batch);
+ batch = storageClient.batch();
+ }
}
- batchHelper.flush();
- }
-
- public void remove(Collection<String> filenames) throws IOException {
- for (String filename : filenames) {
- final GcsPath path = GcsPath.fromUri(filename);
- LOG.debug("Removing: " + path);
- Storage.Objects.Delete deleteObject =
- storageClient.objects().delete(path.getBucket(), path.getObject());
- batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() {
- @Override
- public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException {
- LOG.debug("Successfully removed {}", path);
- }
-
- @Override
- public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
- // Do nothing on item not found.
- if (!errorExtractor.itemNotFound(e)) {
- throw new IOException(e.toString());
- }
- LOG.debug("{} does not exist.", path);
- }
- });
+ if (batch.size() > 0) {
+ batches.add(batch);
}
- batchHelper.flush();
+ return batches;
}
- /**
- * BatchHelper abstracts out the logic for the maximum requests per batch for GCS.
- *
- * <p>Copy of
- * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
- *
- * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not
- * used in Dataflow. Hadoop-related dependencies will be removed from the Google Cloud Storage
- * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project
- * and others may use the connector without introducing unnecessary dependencies.
- *
- * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical
- * grouping of requests.
- */
- @NotThreadSafe
- private static class BatchHelper {
- /**
- * Callback that causes a single StorageRequest to be added to the BatchRequest.
- */
- protected static interface QueueRequestCallback {
- void enqueue() throws IOException;
+ List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException {
+ List<BatchRequest> batches = new LinkedList<>();
+ for (List<String> filesToDelete :
+ Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) {
+ BatchRequest batch = storageClient.batch();
+ for (String file : filesToDelete) {
+ enqueueDelete(GcsPath.fromUri(file), batch);
+ }
+ batches.add(batch);
}
+ return batches;
+ }
- private final List<QueueRequestCallback> pendingBatchEntries;
- private final BatchRequest batch;
-
- // Number of requests that can be queued into a single actual HTTP request
- // before a sub-batch is sent.
- private final long maxRequestsPerBatch;
-
- // Flag that indicates whether there is an in-progress flush.
- private boolean flushing = false;
+ public void remove(Collection<String> filenames) throws IOException {
+ executeBatches(makeRemoveBatches(filenames));
+ }
- /**
- * Primary constructor, generally accessed only via the inner Factory class.
- */
- public BatchHelper(
- HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) {
- this.pendingBatchEntries = new LinkedList<>();
- this.batch = gcs.batch(requestInitializer);
- this.maxRequestsPerBatch = maxRequestsPerBatch;
- }
+ private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch)
+ throws IOException {
+ Storage.Objects.Copy copyRequest = storageClient.objects()
+ .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null);
+ copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
+ @Override
+ public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
+ LOG.debug("Successfully copied {} to {}", from, to);
+ }
- /**
- * Adds an additional request to the batch, and possibly flushes the current contents of the
- * batch if {@code maxRequestsPerBatch} has been reached.
- */
- public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback)
- throws IOException {
- QueueRequestCallback queueCallback = new QueueRequestCallback() {
- @Override
- public void enqueue() throws IOException {
- req.queue(batch, callback);
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+ if (errorExtractor.itemNotFound(e)) {
+ // Do nothing on item not found.
+ LOG.debug("{} does not exist, assuming this is a retry after deletion.", from);
+ return;
}
- };
- pendingBatchEntries.add(queueCallback);
-
- flushIfPossibleAndRequired();
- }
-
- // Flush our buffer if we have more pending entries than maxRequestsPerBatch
- private void flushIfPossibleAndRequired() throws IOException {
- if (pendingBatchEntries.size() > maxRequestsPerBatch) {
- flushIfPossible();
+ throw new IOException(
+ String.format("Error trying to copy %s to %s: %s", from, to, e));
}
- }
+ });
+ }
- // Flush our buffer if we are not already in a flush operation and we have data to flush.
- private void flushIfPossible() throws IOException {
- if (!flushing && pendingBatchEntries.size() > 0) {
- flushing = true;
- try {
- while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) {
- QueueRequestCallback head = pendingBatchEntries.remove(0);
- head.enqueue();
- }
+ private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException {
+ Storage.Objects.Delete deleteRequest = storageClient.objects()
+ .delete(file.getBucket(), file.getObject());
+ deleteRequest.queue(batch, new JsonBatchCallback<Void>() {
+ @Override
+ public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+ LOG.debug("Successfully deleted {}", file);
+ }
- batch.execute();
- } finally {
- flushing = false;
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+ if (errorExtractor.itemNotFound(e)) {
+ // Do nothing on item not found.
+ LOG.debug("{} does not exist.", file);
+ return;
}
+ throw new IOException(String.format("Error trying to delete %s: %s", file, e));
}
- }
-
-
- /**
- * Sends any currently remaining requests in the batch; should be called at the end of any
- * series of batched requests to ensure everything has been sent.
- */
- public void flush() throws IOException {
- flushIfPossible();
- }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f08f21cd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 49c7bc4..997340a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.util;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -32,6 +34,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequest;
@@ -490,4 +493,70 @@ public class GcsUtilTest {
HttpResponse response = request.execute();
return GoogleJsonResponseException.from(jsonFactory, response);
}
+
+ private static List<String> makeStrings(String s, int n) {
+ ImmutableList.Builder<String> ret = ImmutableList.builder();
+ for (int i = 0; i < n; ++i) {
+ ret.add(String.format("gs://bucket/%s%d", s, i));
+ }
+ return ret.build();
+ }
+
+ private static int sumBatchSizes(List<BatchRequest> batches) {
+ int ret = 0;
+ for (BatchRequest b : batches) {
+ ret += b.size();
+ assertThat(b.size(), greaterThan(0));
+ }
+ return ret;
+ }
+
+ @Test
+ public void testMakeCopyBatches() throws IOException {
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ // Small number of files fits in 1 batch
+ List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3));
+ assertThat(batches.size(), equalTo(1));
+ assertThat(sumBatchSizes(batches), equalTo(3));
+
+ // 1 batch of files fits in 1 batch
+ batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100));
+ assertThat(batches.size(), equalTo(1));
+ assertThat(sumBatchSizes(batches), equalTo(100));
+
+ // A little more than 5 batches of files fits in 6 batches
+ batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501));
+ assertThat(batches.size(), equalTo(6));
+ assertThat(sumBatchSizes(batches), equalTo(501));
+ }
+
+ @Test
+ public void testInvalidCopyBatches() throws IOException {
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Number of source files 3");
+
+ gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1));
+ }
+
+ @Test
+ public void testMakeRemoveBatches() throws IOException {
+ GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+ // Small number of files fits in 1 batch
+ List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3));
+ assertThat(batches.size(), equalTo(1));
+ assertThat(sumBatchSizes(batches), equalTo(3));
+
+ // 1 batch of files fits in 1 batch
+ batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
+ assertThat(batches.size(), equalTo(1));
+ assertThat(sumBatchSizes(batches), equalTo(100));
+
+ // A little more than 5 batches of files fits in 6 batches
+ batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
+ assertThat(batches.size(), equalTo(6));
+ assertThat(sumBatchSizes(batches), equalTo(501));
+ }
}