You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:14 UTC
[10/50] incubator-beam git commit: [BEAM-951] FileBasedSink: merge
FileOperations into IOChannelFactory.
[BEAM-951] FileBasedSink: merge FileOperations into IOChannelFactory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7be3f2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7be3f2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7be3f2f
Branch: refs/heads/python-sdk
Commit: a7be3f2f19d9112d690997f4571ccfadd6a0de25
Parents: 479c19a
Author: Pei He <pe...@google.com>
Authored: Wed Nov 9 17:09:13 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Thu Nov 17 10:54:55 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 154 +------------------
.../beam/sdk/util/FileIOChannelFactory.java | 49 ++++++
.../beam/sdk/util/GcsIOChannelFactory.java | 10 ++
.../apache/beam/sdk/util/IOChannelFactory.java | 22 +++
4 files changed, 85 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index f11fbee..5375b90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
@@ -25,16 +24,11 @@ import static com.google.common.base.Strings.isNullOrEmpty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
-import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -48,10 +42,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.FileIOChannelFactory;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
@@ -415,9 +405,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
if (numFiles > 0) {
LOG.debug("Copying {} files.", numFiles);
- FileOperations fileOperations =
- FileOperationsFactory.getFileOperations(destFilenames.get(0), options);
- fileOperations.copy(srcFilenames, destFilenames);
+ IOChannelUtils.getFactory(destFilenames.get(0))
+ .copy(srcFilenames, destFilenames);
} else {
LOG.info("No output files to write.");
}
@@ -458,8 +447,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
throws IOException {
LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
- FileOperations fileOperations =
- FileOperationsFactory.getFileOperations(tempDirectory, options);
IOChannelFactory factory = IOChannelUtils.getFactory(tempDirectory);
// To partially mitigate the effects of filesystems with eventually-consistent
@@ -477,8 +464,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
tempDirectory,
matches.size(),
allMatches.size() - matches.size());
- fileOperations.remove(allMatches);
- fileOperations.remove(ImmutableList.of(tempDirectory));
+ factory.remove(allMatches);
+ factory.remove(ImmutableList.of(tempDirectory));
}
/**
@@ -640,139 +627,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
}
- // File system operations
- // Warning: These class are purposefully private and will be replaced by more robust file I/O
- // utilities. Not for use outside FileBasedSink.
-
- /**
- * Factory for FileOperations.
- */
- private static class FileOperationsFactory {
- /**
- * Return a FileOperations implementation based on which IOChannel would be used to write to a
- * location specification (not necessarily a filename, as it may contain wildcards).
- *
- * <p>Only supports File and GCS locations (currently, the only factories registered with
- * IOChannelUtils). For other locations, an exception is thrown.
- */
- public static FileOperations getFileOperations(String spec, PipelineOptions options)
- throws IOException {
- IOChannelFactory factory = IOChannelUtils.getFactory(spec);
- if (factory instanceof GcsIOChannelFactory) {
- return new GcsOperations(options);
- } else if (factory instanceof FileIOChannelFactory) {
- return new LocalFileOperations(factory);
- } else {
- throw new IOException("Unrecognized file system.");
- }
- }
- }
-
- /**
- * Copy and Remove operations for files. Operations behave like remove-if-existing and
- * copy-if-existing and do not throw exceptions on file not found to enable retries of these
- * operations in the case of transient error.
- */
- private interface FileOperations {
- /**
- * Copies a collection of files from one location to another.
- *
- * <p>The number of source filenames must equal the number of destination filenames.
- *
- * @param srcFilenames the source filenames.
- * @param destFilenames the destination filenames.
- */
- void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
-
- /**
- * Removes a collection of files or directories.
- *
- * <p>Directories are required to be empty. Non-empty directories will not be deleted,
- * and this method may return silently or throw an exception.
- */
- void remove(Collection<String> filesOrDirs) throws IOException;
- }
-
- /**
- * GCS file system operations.
- */
- private static class GcsOperations implements FileOperations {
- private final GcsUtil gcsUtil;
-
- GcsOperations(PipelineOptions options) {
- gcsUtil = new GcsUtilFactory().create(options);
- }
-
- @Override
- public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
- gcsUtil.copy(srcFilenames, destFilenames);
- }
-
- @Override
- public void remove(Collection<String> filesOrDirs) throws IOException {
- gcsUtil.remove(filesOrDirs);
- }
- }
-
- /**
- * File systems supported by {@link Files}.
- */
- private static class LocalFileOperations implements FileOperations {
- private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);
-
- private final IOChannelFactory factory;
-
- LocalFileOperations(IOChannelFactory factory) {
- this.factory = factory;
- }
-
- @Override
- public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
- checkArgument(
- srcFilenames.size() == destFilenames.size(),
- "Number of source files %s must equal number of destination files %s",
- srcFilenames.size(),
- destFilenames.size());
- int numFiles = srcFilenames.size();
- for (int i = 0; i < numFiles; i++) {
- String src = srcFilenames.get(i);
- String dst = destFilenames.get(i);
- LOG.debug("Copying {} to {}", src, dst);
- copyOne(src, dst);
- }
- }
-
- private void copyOne(String source, String destination) throws IOException {
- try {
- // Copy the source file, replacing the existing destination.
- // Paths.get(x) will not work on win cause of the ":" after the drive letter
- Files.copy(
- new File(source).toPath(),
- new File(destination).toPath(),
- StandardCopyOption.REPLACE_EXISTING);
- } catch (NoSuchFileException e) {
- LOG.debug("{} does not exist.", source);
- // Suppress exception if file does not exist.
- }
- }
-
- @Override
- public void remove(Collection<String> filesOrDirs) throws IOException {
- for (String fileOrDir : filesOrDirs) {
- LOG.debug("Removing file {}", fileOrDir);
- removeOne(fileOrDir);
- }
- }
-
- private void removeOne(String fileOrDir) throws IOException {
- // Delete the file if it exists.
- boolean exists = Files.deleteIfExists(Paths.get(fileOrDir));
- if (!exists) {
- LOG.debug("Tried to delete {}, but it did not exist", fileOrDir);
- }
- }
- }
-
/**
* Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
* and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index b5d85fc..5cba970 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
@@ -36,6 +38,7 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -159,4 +162,50 @@ public class FileIOChannelFactory implements IOChannelFactory {
public Path toPath(String path) {
return specToFile(path).toPath();
}
+
+ @Override
+ public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+ checkArgument(
+ srcFilenames.size() == destFilenames.size(),
+ "Number of source files %s must equal number of destination files %s",
+ srcFilenames.size(),
+ destFilenames.size());
+ int numFiles = srcFilenames.size();
+ for (int i = 0; i < numFiles; i++) {
+ String src = srcFilenames.get(i);
+ String dst = destFilenames.get(i);
+ LOG.debug("Copying {} to {}", src, dst);
+ copyOne(src, dst);
+ }
+ }
+
+ private void copyOne(String source, String destination) throws IOException {
+ try {
+ // Copy the source file, replacing the existing destination.
+ // Paths.get(x) will not work on win cause of the ":" after the drive letter
+ Files.copy(
+ new File(source).toPath(),
+ new File(destination).toPath(),
+ StandardCopyOption.REPLACE_EXISTING);
+ } catch (NoSuchFileException e) {
+ LOG.debug("{} does not exist.", source);
+ // Suppress exception if file does not exist.
+ }
+ }
+
+ @Override
+ public void remove(Collection<String> filesOrDirs) throws IOException {
+ for (String fileOrDir : filesOrDirs) {
+ LOG.debug("Removing file {}", fileOrDir);
+ removeOne(fileOrDir);
+ }
+ }
+
+ private void removeOne(String fileOrDir) throws IOException {
+ // Delete the file if it exists.
+ boolean exists = Files.deleteIfExists(Paths.get(fileOrDir));
+ if (!exists) {
+ LOG.debug("Tried to delete {}, but it did not exist", fileOrDir);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
index 652e468..bd2ec4e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -89,4 +89,14 @@ public class GcsIOChannelFactory implements IOChannelFactory {
public Path toPath(String path) {
return GcsPath.fromUri(path);
}
+
+ @Override
+ public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+ options.getGcsUtil().copy(srcFilenames, destFilenames);
+ }
+
+ @Override
+ public void remove(Collection<String> filesOrDirs) throws IOException {
+ options.getGcsUtil().remove(filesOrDirs);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7be3f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
index 4e55036..9504f45 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
@@ -23,6 +23,7 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.util.Collection;
+import java.util.List;
/**
* Defines a factory for working with read and write channels.
@@ -31,7 +32,10 @@ import java.util.Collection;
*
* <p>See <a href="http://docs.oracle.com/javase/7/docs/api/java/nio/channels/package-summary.html"
* >Java NIO Channels</a>
+ *
+ * @deprecated This is under redesign, see: https://issues.apache.org/jira/browse/BEAM-59.
*/
+@Deprecated
public interface IOChannelFactory {
/**
@@ -103,4 +107,22 @@ public interface IOChannelFactory {
/** Converts the given string to a {@link Path}. */
Path toPath(String path);
+
+ /**
+ * Copies a collection of files from one location to another.
+ *
+ * <p>The number of source filenames must equal the number of destination filenames.
+ *
+ * @param srcFilenames the source filenames.
+ * @param destFilenames the destination filenames.
+ */
+ void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
+
+ /**
+ * Removes a collection of files or directories.
+ *
+ * <p>Directories are required to be empty. Non-empty directories will not be deleted,
+ * and this method may return silently or throw an exception.
+ */
+ void remove(Collection<String> filesOrDirs) throws IOException;
}