You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/06 18:26:50 UTC

[1/2] incubator-beam git commit: Move copy and remove from FileBasedSink to GcsUtil.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 51e1e59b8 -> dc4f2f706


Move copy and remove from FileBasedSink to GcsUtil.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c88bfff6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c88bfff6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c88bfff6

Branch: refs/heads/master
Commit: c88bfff6fc56cea57e447d841f54ed4aa7e03d3b
Parents: 51e1e59
Author: Pei He <pe...@google.com>
Authored: Thu May 5 18:29:00 2016 -0700
Committer: Pei He <pe...@google.com>
Committed: Thu May 5 18:29:00 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 181 +------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 166 ++++++++++++++++-
 2 files changed, 172 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c88bfff6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 10e93f5..7d23e7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -19,26 +19,16 @@ package org.apache.beam.sdk.io;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.FileIOChannelFactory;
 import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.http.HttpHeaders;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.StorageRequest;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
+
 import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
@@ -54,11 +44,8 @@ import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 /**
  * Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based
  * output and defines the format of output files (how values are written, headers/footers, MIME
@@ -611,80 +598,20 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * GCS file system operations.
    */
   private static class GcsOperations implements FileOperations {
-    private static final Logger LOG = LoggerFactory.getLogger(GcsOperations.class);
-
-    /**
-     * Maximum number of requests permitted in a GCS batch request.
-     */
-    private static final int MAX_REQUESTS_PER_BATCH = 1000;
-
-    private ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-    private GcsOptions gcsOptions;
-    private Storage gcs;
-    private BatchHelper batchHelper;
+    private final GcsUtil gcsUtil;
 
     public GcsOperations(PipelineOptions options) {
-      gcsOptions = options.as(GcsOptions.class);
-      gcs = Transport.newStorageClient(gcsOptions).build();
-      batchHelper =
-          new BatchHelper(gcs.getRequestFactory().getInitializer(), gcs, MAX_REQUESTS_PER_BATCH);
+      gcsUtil = new GcsUtilFactory().create(options);
     }
 
     @Override
     public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
-      Preconditions.checkArgument(
-          srcFilenames.size() == destFilenames.size(),
-          String.format("Number of source files {} must equal number of destination files {}",
-              srcFilenames.size(), destFilenames.size()));
-      for (int i = 0; i < srcFilenames.size(); i++) {
-        final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
-        final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
-        LOG.debug("Copying {} to {}", sourcePath, destPath);
-        Storage.Objects.Copy copyObject = gcs.objects().copy(sourcePath.getBucket(),
-            sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null);
-        batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() {
-          @Override
-          public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
-            LOG.debug("Successfully copied {} to {}", sourcePath, destPath);
-          }
-
-          @Override
-          public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-            // Do nothing on item not found.
-            if (!errorExtractor.itemNotFound(e)) {
-              throw new IOException(e.toString());
-            }
-            LOG.debug("{} does not exist.", sourcePath);
-          }
-        });
-      }
-      batchHelper.flush();
+      gcsUtil.copy(srcFilenames, destFilenames);
     }
 
     @Override
     public void remove(Collection<String> filenames) throws IOException {
-      for (String filename : filenames) {
-        final GcsPath path = GcsPath.fromUri(filename);
-        LOG.debug("Removing: " + path);
-        Storage.Objects.Delete deleteObject =
-            gcs.objects().delete(path.getBucket(), path.getObject());
-        batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() {
-          @Override
-          public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException {
-            LOG.debug("Successfully removed {}", path);
-          }
-
-          @Override
-          public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-            // Do nothing on item not found.
-            if (!errorExtractor.itemNotFound(e)) {
-              throw new IOException(e.toString());
-            }
-            LOG.debug("{} does not exist.", path);
-          }
-        });
-      }
-      batchHelper.flush();
+      gcsUtil.remove(filenames);
     }
   }
 
@@ -735,98 +662,4 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       }
     }
   }
-
-  /**
-   * BatchHelper abstracts out the logic for the maximum requests per batch for GCS.
-   *
-   * <p>Copy of
-   * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
-   *
-   * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not
-   * used in Dataflow.  Hadoop-related dependencies will be removed from the Google Cloud Storage
-   * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project
-   * and others may use the connector without introducing unnecessary dependencies.
-   *
-   * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical
-   * grouping of requests.
-   */
-  @NotThreadSafe
-  private static class BatchHelper {
-    /**
-     * Callback that causes a single StorageRequest to be added to the BatchRequest.
-     */
-    protected static interface QueueRequestCallback {
-      void enqueue() throws IOException;
-    }
-
-    private final List<QueueRequestCallback> pendingBatchEntries;
-    private final BatchRequest batch;
-
-    // Number of requests that can be queued into a single actual HTTP request
-    // before a sub-batch is sent.
-    private final long maxRequestsPerBatch;
-
-    // Flag that indicates whether there is an in-progress flush.
-    private boolean flushing = false;
-
-    /**
-     * Primary constructor, generally accessed only via the inner Factory class.
-     */
-    public BatchHelper(
-        HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) {
-      this.pendingBatchEntries = new LinkedList<>();
-      this.batch = gcs.batch(requestInitializer);
-      this.maxRequestsPerBatch = maxRequestsPerBatch;
-    }
-
-    /**
-     * Adds an additional request to the batch, and possibly flushes the current contents of the
-     * batch if {@code maxRequestsPerBatch} has been reached.
-     */
-    public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback)
-        throws IOException {
-      QueueRequestCallback queueCallback = new QueueRequestCallback() {
-        @Override
-        public void enqueue() throws IOException {
-          req.queue(batch, callback);
-        }
-      };
-      pendingBatchEntries.add(queueCallback);
-
-      flushIfPossibleAndRequired();
-    }
-
-    // Flush our buffer if we have more pending entries than maxRequestsPerBatch
-    private void flushIfPossibleAndRequired() throws IOException {
-      if (pendingBatchEntries.size() > maxRequestsPerBatch) {
-        flushIfPossible();
-      }
-    }
-
-    // Flush our buffer if we are not already in a flush operation and we have data to flush.
-    private void flushIfPossible() throws IOException {
-      if (!flushing && pendingBatchEntries.size() > 0) {
-        flushing = true;
-        try {
-          while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) {
-            QueueRequestCallback head = pendingBatchEntries.remove(0);
-            head.enqueue();
-          }
-
-          batch.execute();
-        } finally {
-          flushing = false;
-        }
-      }
-    }
-
-
-    /**
-     * Sends any currently remaining requests in the batch; should be called at the end of any
-     * series of batched requests to ensure everything has been sent.
-     */
-    public void flush() throws IOException {
-      flushIfPossible();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c88bfff6/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index f7ef447..aa8774b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -22,10 +22,16 @@ import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
+import com.google.api.client.googleapis.json.GoogleJsonError;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.StorageRequest;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
@@ -47,6 +53,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,6 +62,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 
 /**
  * Provides operations on GCS.
@@ -76,7 +84,6 @@ public class GcsUtil {
     public GcsUtil create(PipelineOptions options) {
       LOG.debug("Creating new GcsUtil");
       GcsOptions gcsOptions = options.as(GcsOptions.class);
-
       return new GcsUtil(Transport.newStorageClient(gcsOptions).build(),
           gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes());
     }
@@ -98,6 +105,11 @@ public class GcsUtil {
   private static final Pattern RECURSIVE_GCS_PATTERN =
       Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");
 
+  /**
+   * Maximum number of requests permitted in a GCS batch request.
+   */
+  private static final int MAX_REQUESTS_PER_BATCH = 1000;
+
   /////////////////////////////////////////////////////////////////////////////
 
   /** Client for the GCS API. */
@@ -111,6 +123,7 @@ public class GcsUtil {
   // Exposed for testing.
   final ExecutorService executorService;
 
+  private final BatchHelper batchHelper;
   /**
    * Returns true if the given GCS pattern is supported otherwise fails with an
    * exception.
@@ -130,6 +143,8 @@ public class GcsUtil {
     this.storageClient = storageClient;
     this.uploadBufferSizeBytes = uploadBufferSizeBytes;
     this.executorService = executorService;
+    this.batchHelper = new BatchHelper(
+        storageClient.getRequestFactory().getInitializer(), storageClient, MAX_REQUESTS_PER_BATCH);
   }
 
   // Use this only for testing purposes.
@@ -355,6 +370,155 @@ public class GcsUtil {
      }
   }
 
+  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+    Preconditions.checkArgument(
+        srcFilenames.size() == destFilenames.size(),
+        String.format("Number of source files {} must equal number of destination files {}",
+            srcFilenames.size(), destFilenames.size()));
+    for (int i = 0; i < srcFilenames.size(); i++) {
+      final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
+      final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
+      LOG.debug("Copying {} to {}", sourcePath, destPath);
+      Storage.Objects.Copy copyObject = storageClient.objects().copy(sourcePath.getBucket(),
+          sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null);
+      batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() {
+        @Override
+        public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
+          LOG.debug("Successfully copied {} to {}", sourcePath, destPath);
+        }
+
+        @Override
+        public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+          // Do nothing on item not found.
+          if (!errorExtractor.itemNotFound(e)) {
+            throw new IOException(e.toString());
+          }
+          LOG.debug("{} does not exist.", sourcePath);
+        }
+      });
+    }
+    batchHelper.flush();
+  }
+
+  public void remove(Collection<String> filenames) throws IOException {
+    for (String filename : filenames) {
+      final GcsPath path = GcsPath.fromUri(filename);
+      LOG.debug("Removing: " + path);
+      Storage.Objects.Delete deleteObject =
+          storageClient.objects().delete(path.getBucket(), path.getObject());
+      batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() {
+        @Override
+        public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException {
+          LOG.debug("Successfully removed {}", path);
+        }
+
+        @Override
+        public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
+          // Do nothing on item not found.
+          if (!errorExtractor.itemNotFound(e)) {
+            throw new IOException(e.toString());
+          }
+          LOG.debug("{} does not exist.", path);
+        }
+      });
+    }
+    batchHelper.flush();
+  }
+
+  /**
+   * BatchHelper abstracts out the logic for the maximum requests per batch for GCS.
+   *
+   * <p>Copy of
+   * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
+   *
+   * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not
+   * used in Dataflow.  Hadoop-related dependencies will be removed from the Google Cloud Storage
+   * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project
+   * and others may use the connector without introducing unnecessary dependencies.
+   *
+   * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical
+   * grouping of requests.
+   */
+  @NotThreadSafe
+  private static class BatchHelper {
+    /**
+     * Callback that causes a single StorageRequest to be added to the BatchRequest.
+     */
+    protected static interface QueueRequestCallback {
+      void enqueue() throws IOException;
+    }
+
+    private final List<QueueRequestCallback> pendingBatchEntries;
+    private final BatchRequest batch;
+
+    // Number of requests that can be queued into a single actual HTTP request
+    // before a sub-batch is sent.
+    private final long maxRequestsPerBatch;
+
+    // Flag that indicates whether there is an in-progress flush.
+    private boolean flushing = false;
+
+    /**
+     * Primary constructor, generally accessed only via the inner Factory class.
+     */
+    public BatchHelper(
+        HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) {
+      this.pendingBatchEntries = new LinkedList<>();
+      this.batch = gcs.batch(requestInitializer);
+      this.maxRequestsPerBatch = maxRequestsPerBatch;
+    }
+
+    /**
+     * Adds an additional request to the batch, and possibly flushes the current contents of the
+     * batch if {@code maxRequestsPerBatch} has been reached.
+     */
+    public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback)
+        throws IOException {
+      QueueRequestCallback queueCallback = new QueueRequestCallback() {
+        @Override
+        public void enqueue() throws IOException {
+          req.queue(batch, callback);
+        }
+      };
+      pendingBatchEntries.add(queueCallback);
+
+      flushIfPossibleAndRequired();
+    }
+
+    // Flush our buffer if we have more pending entries than maxRequestsPerBatch
+    private void flushIfPossibleAndRequired() throws IOException {
+      if (pendingBatchEntries.size() > maxRequestsPerBatch) {
+        flushIfPossible();
+      }
+    }
+
+    // Flush our buffer if we are not already in a flush operation and we have data to flush.
+    private void flushIfPossible() throws IOException {
+      if (!flushing && pendingBatchEntries.size() > 0) {
+        flushing = true;
+        try {
+          while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) {
+            QueueRequestCallback head = pendingBatchEntries.remove(0);
+            head.enqueue();
+          }
+
+          batch.execute();
+        } finally {
+          flushing = false;
+        }
+      }
+    }
+
+
+    /**
+     * Sends any currently remaining requests in the batch; should be called at the end of any
+     * series of batched requests to ensure everything has been sent.
+     */
+    public void flush() throws IOException {
+      flushIfPossible();
+    }
+  }
+
   /**
    * Expands glob expressions to regular expressions.
    *


[2/2] incubator-beam git commit: Closes #295

Posted by dh...@apache.org.
Closes #295


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc4f2f70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc4f2f70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc4f2f70

Branch: refs/heads/master
Commit: dc4f2f706e0243c8afc8abbd8563d1d2c3172989
Parents: 51e1e59 c88bfff
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 6 11:26:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 6 11:26:37 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 181 +------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 166 ++++++++++++++++-
 2 files changed, 172 insertions(+), 175 deletions(-)
----------------------------------------------------------------------