You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/06 18:26:50 UTC
[1/2] incubator-beam git commit: Move copy and remove from
FileBasedSink to GcsUtil.
Repository: incubator-beam
Updated Branches:
refs/heads/master 51e1e59b8 -> dc4f2f706
Move copy and remove from FileBasedSink to GcsUtil.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c88bfff6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c88bfff6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c88bfff6
Branch: refs/heads/master
Commit: c88bfff6fc56cea57e447d841f54ed4aa7e03d3b
Parents: 51e1e59
Author: Pei He <pe...@google.com>
Authored: Thu May 5 18:29:00 2016 -0700
Committer: Pei He <pe...@google.com>
Committed: Thu May 5 18:29:00 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 181 +------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 166 ++++++++++++++++-
2 files changed, 172 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c88bfff6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 10e93f5..7d23e7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -19,26 +19,16 @@ package org.apache.beam.sdk.io;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FileIOChannelFactory;
import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.http.HttpHeaders;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.StorageRequest;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
+
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -54,11 +44,8 @@ import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
-import javax.annotation.concurrent.NotThreadSafe;
-
/**
* Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based
* output and defines the format of output files (how values are written, headers/footers, MIME
@@ -611,80 +598,20 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* GCS file system operations.
*/
private static class GcsOperations implements FileOperations {
- private static final Logger LOG = LoggerFactory.getLogger(GcsOperations.class);
-
- /**
- * Maximum number of requests permitted in a GCS batch request.
- */
- private static final int MAX_REQUESTS_PER_BATCH = 1000;
-
- private ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
- private GcsOptions gcsOptions;
- private Storage gcs;
- private BatchHelper batchHelper;
+ private final GcsUtil gcsUtil;
public GcsOperations(PipelineOptions options) {
- gcsOptions = options.as(GcsOptions.class);
- gcs = Transport.newStorageClient(gcsOptions).build();
- batchHelper =
- new BatchHelper(gcs.getRequestFactory().getInitializer(), gcs, MAX_REQUESTS_PER_BATCH);
+ gcsUtil = new GcsUtilFactory().create(options);
}
@Override
public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
- Preconditions.checkArgument(
- srcFilenames.size() == destFilenames.size(),
- String.format("Number of source files {} must equal number of destination files {}",
- srcFilenames.size(), destFilenames.size()));
- for (int i = 0; i < srcFilenames.size(); i++) {
- final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
- final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
- LOG.debug("Copying {} to {}", sourcePath, destPath);
- Storage.Objects.Copy copyObject = gcs.objects().copy(sourcePath.getBucket(),
- sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null);
- batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() {
- @Override
- public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
- LOG.debug("Successfully copied {} to {}", sourcePath, destPath);
- }
-
- @Override
- public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
- // Do nothing on item not found.
- if (!errorExtractor.itemNotFound(e)) {
- throw new IOException(e.toString());
- }
- LOG.debug("{} does not exist.", sourcePath);
- }
- });
- }
- batchHelper.flush();
+ gcsUtil.copy(srcFilenames, destFilenames);
}
@Override
public void remove(Collection<String> filenames) throws IOException {
- for (String filename : filenames) {
- final GcsPath path = GcsPath.fromUri(filename);
- LOG.debug("Removing: " + path);
- Storage.Objects.Delete deleteObject =
- gcs.objects().delete(path.getBucket(), path.getObject());
- batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() {
- @Override
- public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException {
- LOG.debug("Successfully removed {}", path);
- }
-
- @Override
- public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
- // Do nothing on item not found.
- if (!errorExtractor.itemNotFound(e)) {
- throw new IOException(e.toString());
- }
- LOG.debug("{} does not exist.", path);
- }
- });
- }
- batchHelper.flush();
+ gcsUtil.remove(filenames);
}
}
@@ -735,98 +662,4 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
}
}
-
- /**
- * BatchHelper abstracts out the logic for the maximum requests per batch for GCS.
- *
- * <p>Copy of
- * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
- *
- * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not
- * used in Dataflow. Hadoop-related dependencies will be removed from the Google Cloud Storage
- * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project
- * and others may use the connector without introducing unnecessary dependencies.
- *
- * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical
- * grouping of requests.
- */
- @NotThreadSafe
- private static class BatchHelper {
- /**
- * Callback that causes a single StorageRequest to be added to the BatchRequest.
- */
- protected static interface QueueRequestCallback {
- void enqueue() throws IOException;
- }
-
- private final List<QueueRequestCallback> pendingBatchEntries;
- private final BatchRequest batch;
-
- // Number of requests that can be queued into a single actual HTTP request
- // before a sub-batch is sent.
- private final long maxRequestsPerBatch;
-
- // Flag that indicates whether there is an in-progress flush.
- private boolean flushing = false;
-
- /**
- * Primary constructor, generally accessed only via the inner Factory class.
- */
- public BatchHelper(
- HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) {
- this.pendingBatchEntries = new LinkedList<>();
- this.batch = gcs.batch(requestInitializer);
- this.maxRequestsPerBatch = maxRequestsPerBatch;
- }
-
- /**
- * Adds an additional request to the batch, and possibly flushes the current contents of the
- * batch if {@code maxRequestsPerBatch} has been reached.
- */
- public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback)
- throws IOException {
- QueueRequestCallback queueCallback = new QueueRequestCallback() {
- @Override
- public void enqueue() throws IOException {
- req.queue(batch, callback);
- }
- };
- pendingBatchEntries.add(queueCallback);
-
- flushIfPossibleAndRequired();
- }
-
- // Flush our buffer if we have more pending entries than maxRequestsPerBatch
- private void flushIfPossibleAndRequired() throws IOException {
- if (pendingBatchEntries.size() > maxRequestsPerBatch) {
- flushIfPossible();
- }
- }
-
- // Flush our buffer if we are not already in a flush operation and we have data to flush.
- private void flushIfPossible() throws IOException {
- if (!flushing && pendingBatchEntries.size() > 0) {
- flushing = true;
- try {
- while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) {
- QueueRequestCallback head = pendingBatchEntries.remove(0);
- head.enqueue();
- }
-
- batch.execute();
- } finally {
- flushing = false;
- }
- }
- }
-
-
- /**
- * Sends any currently remaining requests in the batch; should be called at the end of any
- * series of batched requests to ensure everything has been sent.
- */
- public void flush() throws IOException {
- flushIfPossible();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c88bfff6/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index f7ef447..aa8774b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -22,10 +22,16 @@ import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
+import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.StorageRequest;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
@@ -47,6 +53,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -55,6 +62,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
/**
* Provides operations on GCS.
@@ -76,7 +84,6 @@ public class GcsUtil {
public GcsUtil create(PipelineOptions options) {
LOG.debug("Creating new GcsUtil");
GcsOptions gcsOptions = options.as(GcsOptions.class);
-
return new GcsUtil(Transport.newStorageClient(gcsOptions).build(),
gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes());
}
@@ -98,6 +105,11 @@ public class GcsUtil {
private static final Pattern RECURSIVE_GCS_PATTERN =
Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");
+ /**
+ * Maximum number of requests permitted in a GCS batch request.
+ */
+ private static final int MAX_REQUESTS_PER_BATCH = 1000;
+
/////////////////////////////////////////////////////////////////////////////
/** Client for the GCS API. */
@@ -111,6 +123,7 @@ public class GcsUtil {
// Exposed for testing.
final ExecutorService executorService;
+ private final BatchHelper batchHelper;
/**
* Returns true if the given GCS pattern is supported otherwise fails with an
* exception.
@@ -130,6 +143,8 @@ public class GcsUtil {
this.storageClient = storageClient;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
this.executorService = executorService;
+ this.batchHelper = new BatchHelper(
+ storageClient.getRequestFactory().getInitializer(), storageClient, MAX_REQUESTS_PER_BATCH);
}
// Use this only for testing purposes.
@@ -355,6 +370,155 @@ public class GcsUtil {
}
}
+ public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+ Preconditions.checkArgument(
+ srcFilenames.size() == destFilenames.size(),
+ String.format("Number of source files {} must equal number of destination files {}",
+ srcFilenames.size(), destFilenames.size()));
+ for (int i = 0; i < srcFilenames.size(); i++) {
+ final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
+ final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
+ LOG.debug("Copying {} to {}", sourcePath, destPath);
+ Storage.Objects.Copy copyObject = storageClient.objects().copy(sourcePath.getBucket(),
+ sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null);
+ batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() {
+ @Override
+ public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
+ LOG.debug("Successfully copied {} to {}", sourcePath, destPath);
+ }
+
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+ // Do nothing on item not found.
+ if (!errorExtractor.itemNotFound(e)) {
+ throw new IOException(e.toString());
+ }
+ LOG.debug("{} does not exist.", sourcePath);
+ }
+ });
+ }
+ batchHelper.flush();
+ }
+
+ public void remove(Collection<String> filenames) throws IOException {
+ for (String filename : filenames) {
+ final GcsPath path = GcsPath.fromUri(filename);
+ LOG.debug("Removing: " + path);
+ Storage.Objects.Delete deleteObject =
+ storageClient.objects().delete(path.getBucket(), path.getObject());
+ batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() {
+ @Override
+ public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException {
+ LOG.debug("Successfully removed {}", path);
+ }
+
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+ // Do nothing on item not found.
+ if (!errorExtractor.itemNotFound(e)) {
+ throw new IOException(e.toString());
+ }
+ LOG.debug("{} does not exist.", path);
+ }
+ });
+ }
+ batchHelper.flush();
+ }
+
+ /**
+ * BatchHelper abstracts out the logic for the maximum requests per batch for GCS.
+ *
+ * <p>Copy of
+ * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
+ *
+ * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not
+ * used in Dataflow. Hadoop-related dependencies will be removed from the Google Cloud Storage
+ * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project
+ * and others may use the connector without introducing unnecessary dependencies.
+ *
+ * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical
+ * grouping of requests.
+ */
+ @NotThreadSafe
+ private static class BatchHelper {
+ /**
+ * Callback that causes a single StorageRequest to be added to the BatchRequest.
+ */
+ protected static interface QueueRequestCallback {
+ void enqueue() throws IOException;
+ }
+
+ private final List<QueueRequestCallback> pendingBatchEntries;
+ private final BatchRequest batch;
+
+ // Number of requests that can be queued into a single actual HTTP request
+ // before a sub-batch is sent.
+ private final long maxRequestsPerBatch;
+
+ // Flag that indicates whether there is an in-progress flush.
+ private boolean flushing = false;
+
+ /**
+ * Primary constructor, generally accessed only via the inner Factory class.
+ */
+ public BatchHelper(
+ HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) {
+ this.pendingBatchEntries = new LinkedList<>();
+ this.batch = gcs.batch(requestInitializer);
+ this.maxRequestsPerBatch = maxRequestsPerBatch;
+ }
+
+ /**
+ * Adds an additional request to the batch, and possibly flushes the current contents of the
+ * batch if {@code maxRequestsPerBatch} has been reached.
+ */
+ public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback)
+ throws IOException {
+ QueueRequestCallback queueCallback = new QueueRequestCallback() {
+ @Override
+ public void enqueue() throws IOException {
+ req.queue(batch, callback);
+ }
+ };
+ pendingBatchEntries.add(queueCallback);
+
+ flushIfPossibleAndRequired();
+ }
+
+ // Flush our buffer if we have more pending entries than maxRequestsPerBatch
+ private void flushIfPossibleAndRequired() throws IOException {
+ if (pendingBatchEntries.size() > maxRequestsPerBatch) {
+ flushIfPossible();
+ }
+ }
+
+ // Flush our buffer if we are not already in a flush operation and we have data to flush.
+ private void flushIfPossible() throws IOException {
+ if (!flushing && pendingBatchEntries.size() > 0) {
+ flushing = true;
+ try {
+ while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) {
+ QueueRequestCallback head = pendingBatchEntries.remove(0);
+ head.enqueue();
+ }
+
+ batch.execute();
+ } finally {
+ flushing = false;
+ }
+ }
+ }
+
+
+ /**
+ * Sends any currently remaining requests in the batch; should be called at the end of any
+ * series of batched requests to ensure everything has been sent.
+ */
+ public void flush() throws IOException {
+ flushIfPossible();
+ }
+ }
+
/**
* Expands glob expressions to regular expressions.
*
[2/2] incubator-beam git commit: Closes #295
Posted by dh...@apache.org.
Closes #295
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc4f2f70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc4f2f70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc4f2f70
Branch: refs/heads/master
Commit: dc4f2f706e0243c8afc8abbd8563d1d2c3172989
Parents: 51e1e59 c88bfff
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 6 11:26:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 6 11:26:37 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 181 +------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 166 ++++++++++++++++-
2 files changed, 172 insertions(+), 175 deletions(-)
----------------------------------------------------------------------