You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:14 UTC

[10/50] incubator-beam git commit: [BEAM-951] FileBasedSink: merge FileOperations into IOChannelFactory.

[BEAM-951] FileBasedSink: merge FileOperations into IOChannelFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7be3f2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7be3f2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7be3f2f

Branch: refs/heads/python-sdk
Commit: a7be3f2f19d9112d690997f4571ccfadd6a0de25
Parents: 479c19a
Author: Pei He <pe...@google.com>
Authored: Wed Nov 9 17:09:13 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Thu Nov 17 10:54:55 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 154 +------------------
 .../beam/sdk/util/FileIOChannelFactory.java     |  49 ++++++
 .../beam/sdk/util/GcsIOChannelFactory.java      |  10 ++
 .../apache/beam/sdk/util/IOChannelFactory.java  |  22 +++
 4 files changed, 85 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index f11fbee..5375b90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -25,16 +24,11 @@ import static com.google.common.base.Strings.isNullOrEmpty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -48,10 +42,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.FileIOChannelFactory;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -415,9 +405,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
 
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
-        FileOperations fileOperations =
-            FileOperationsFactory.getFileOperations(destFilenames.get(0), options);
-        fileOperations.copy(srcFilenames, destFilenames);
+        IOChannelUtils.getFactory(destFilenames.get(0))
+            .copy(srcFilenames, destFilenames);
       } else {
         LOG.info("No output files to write.");
       }
@@ -458,8 +447,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
         throws IOException {
       LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
-      FileOperations fileOperations =
-          FileOperationsFactory.getFileOperations(tempDirectory, options);
       IOChannelFactory factory = IOChannelUtils.getFactory(tempDirectory);
 
       // To partially mitigate the effects of filesystems with eventually-consistent
@@ -477,8 +464,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
           tempDirectory,
           matches.size(),
           allMatches.size() - matches.size());
-      fileOperations.remove(allMatches);
-      fileOperations.remove(ImmutableList.of(tempDirectory));
+      factory.remove(allMatches);
+      factory.remove(ImmutableList.of(tempDirectory));
     }
 
     /**
@@ -640,139 +627,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
   }
 
-  // File system operations
-  // Warning: These class are purposefully private and will be replaced by more robust file I/O
-  // utilities. Not for use outside FileBasedSink.
-
-  /**
-   * Factory for FileOperations.
-   */
-  private static class FileOperationsFactory {
-    /**
-     * Return a FileOperations implementation based on which IOChannel would be used to write to a
-     * location specification (not necessarily a filename, as it may contain wildcards).
-     *
-     * <p>Only supports File and GCS locations (currently, the only factories registered with
-     * IOChannelUtils). For other locations, an exception is thrown.
-     */
-    public static FileOperations getFileOperations(String spec, PipelineOptions options)
-        throws IOException {
-      IOChannelFactory factory = IOChannelUtils.getFactory(spec);
-      if (factory instanceof GcsIOChannelFactory) {
-        return new GcsOperations(options);
-      } else if (factory instanceof FileIOChannelFactory) {
-        return new LocalFileOperations(factory);
-      } else {
-        throw new IOException("Unrecognized file system.");
-      }
-    }
-  }
-
-  /**
-   * Copy and Remove operations for files. Operations behave like remove-if-existing and
-   * copy-if-existing and do not throw exceptions on file not found to enable retries of these
-   * operations in the case of transient error.
-   */
-  private interface FileOperations {
-    /**
-     * Copies a collection of files from one location to another.
-     *
-     * <p>The number of source filenames must equal the number of destination filenames.
-     *
-     * @param srcFilenames the source filenames.
-     * @param destFilenames the destination filenames.
-     */
-    void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
-
-    /**
-     * Removes a collection of files or directories.
-     *
-     * <p>Directories are required to be empty. Non-empty directories will not be deleted,
-     * and this method may return silently or throw an exception.
-     */
-    void remove(Collection<String> filesOrDirs) throws IOException;
-  }
-
-  /**
-   * GCS file system operations.
-   */
-  private static class GcsOperations implements FileOperations {
-    private final GcsUtil gcsUtil;
-
-    GcsOperations(PipelineOptions options) {
-      gcsUtil = new GcsUtilFactory().create(options);
-    }
-
-    @Override
-    public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
-      gcsUtil.copy(srcFilenames, destFilenames);
-    }
-
-    @Override
-    public void remove(Collection<String> filesOrDirs) throws IOException {
-      gcsUtil.remove(filesOrDirs);
-    }
-  }
-
-  /**
-   * File systems supported by {@link Files}.
-   */
-  private static class LocalFileOperations implements FileOperations {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);
-
-    private final IOChannelFactory factory;
-
-    LocalFileOperations(IOChannelFactory factory) {
-      this.factory = factory;
-    }
-
-    @Override
-    public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
-      checkArgument(
-          srcFilenames.size() == destFilenames.size(),
-          "Number of source files %s must equal number of destination files %s",
-          srcFilenames.size(),
-          destFilenames.size());
-      int numFiles = srcFilenames.size();
-      for (int i = 0; i < numFiles; i++) {
-        String src = srcFilenames.get(i);
-        String dst = destFilenames.get(i);
-        LOG.debug("Copying {} to {}", src, dst);
-        copyOne(src, dst);
-      }
-    }
-
-    private void copyOne(String source, String destination) throws IOException {
-      try {
-        // Copy the source file, replacing the existing destination.
-        // Paths.get(x) will not work on win cause of the ":" after the drive letter
-        Files.copy(
-                new File(source).toPath(),
-                new File(destination).toPath(),
-                StandardCopyOption.REPLACE_EXISTING);
-      } catch (NoSuchFileException e) {
-        LOG.debug("{} does not exist.", source);
-        // Suppress exception if file does not exist.
-      }
-    }
-
-    @Override
-    public void remove(Collection<String> filesOrDirs) throws IOException {
-      for (String fileOrDir : filesOrDirs) {
-        LOG.debug("Removing file {}", fileOrDir);
-        removeOne(fileOrDir);
-      }
-    }
-
-    private void removeOne(String fileOrDir) throws IOException {
-      // Delete the file if it exists.
-      boolean exists = Files.deleteIfExists(Paths.get(fileOrDir));
-      if (!exists) {
-        LOG.debug("Tried to delete {}, but it did not exist", fileOrDir);
-      }
-    }
-  }
-
   /**
    * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
    * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index b5d85fc..5cba970 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
@@ -36,6 +38,7 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -159,4 +162,50 @@ public class FileIOChannelFactory implements IOChannelFactory {
   public Path toPath(String path) {
     return specToFile(path).toPath();
   }
+
+  @Override
+  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+    checkArgument(
+        srcFilenames.size() == destFilenames.size(),
+        "Number of source files %s must equal number of destination files %s",
+        srcFilenames.size(),
+        destFilenames.size());
+    int numFiles = srcFilenames.size();
+    for (int i = 0; i < numFiles; i++) {
+      String src = srcFilenames.get(i);
+      String dst = destFilenames.get(i);
+      LOG.debug("Copying {} to {}", src, dst);
+      copyOne(src, dst);
+    }
+  }
+
+  private void copyOne(String source, String destination) throws IOException {
+    try {
+      // Copy the source file, replacing the existing destination.
+      // Paths.get(x) will not work on win cause of the ":" after the drive letter
+      Files.copy(
+          new File(source).toPath(),
+          new File(destination).toPath(),
+          StandardCopyOption.REPLACE_EXISTING);
+    } catch (NoSuchFileException e) {
+      LOG.debug("{} does not exist.", source);
+      // Suppress exception if file does not exist.
+    }
+  }
+
+  @Override
+  public void remove(Collection<String> filesOrDirs) throws IOException {
+    for (String fileOrDir : filesOrDirs) {
+      LOG.debug("Removing file {}", fileOrDir);
+      removeOne(fileOrDir);
+    }
+  }
+
+  private void removeOne(String fileOrDir) throws IOException {
+    // Delete the file if it exists.
+    boolean exists = Files.deleteIfExists(Paths.get(fileOrDir));
+    if (!exists) {
+      LOG.debug("Tried to delete {}, but it did not exist", fileOrDir);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
index 652e468..bd2ec4e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -89,4 +89,14 @@ public class GcsIOChannelFactory implements IOChannelFactory {
   public Path toPath(String path) {
     return GcsPath.fromUri(path);
   }
+
+  @Override
+  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+    options.getGcsUtil().copy(srcFilenames, destFilenames);
+  }
+
+  @Override
+  public void remove(Collection<String> filesOrDirs) throws IOException {
+    options.getGcsUtil().remove(filesOrDirs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
index 4e55036..9504f45 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
@@ -23,6 +23,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * Defines a factory for working with read and write channels.
@@ -31,7 +32,10 @@ import java.util.Collection;
  *
  * <p>See <a href="http://docs.oracle.com/javase/7/docs/api/java/nio/channels/package-summary.html"
  * >Java NIO Channels</a>
+ *
+ * @deprecated This is under redesign, see: https://issues.apache.org/jira/browse/BEAM-59.
  */
+@Deprecated
 public interface IOChannelFactory {
 
   /**
@@ -103,4 +107,22 @@ public interface IOChannelFactory {
 
   /** Converts the given string to a {@link Path}. */
   Path toPath(String path);
+
+  /**
+   * Copies a collection of files from one location to another.
+   *
+   * <p>The number of source filenames must equal the number of destination filenames.
+   *
+   * @param srcFilenames the source filenames.
+   * @param destFilenames the destination filenames.
+   */
+  void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
+
+  /**
+   * Removes a collection of files or directories.
+   *
+   * <p>Directories are required to be empty. Non-empty directories will not be deleted,
+   * and this method may return silently or throw an exception.
+   */
+  void remove(Collection<String> filesOrDirs) throws IOException;
 }