You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/03/30 11:30:54 UTC

[1/2] beam git commit: [BEAM-59] Beam FileSystems: add match(), copy(), rename(), delete() utilities.

Repository: beam
Updated Branches:
  refs/heads/master 536343142 -> 769398e40


[BEAM-59] Beam FileSystems: add match(), copy(), rename(), delete() utilities.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d07ef530
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d07ef530
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d07ef530

Branch: refs/heads/master
Commit: d07ef530dac17e9064cd0003470163ccbef7dfef
Parents: 5363431
Author: Pei He <pe...@gmail.com>
Authored: Fri Mar 10 14:15:18 2017 +0800
Committer: Pei He <pe...@gmail.com>
Committed: Thu Mar 30 19:23:56 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileSystems.java     | 302 +++++++++++++++++--
 .../org/apache/beam/sdk/io/LocalFileSystem.java |  57 ++--
 .../org/apache/beam/sdk/io/LocalResourceId.java |   2 +-
 .../org/apache/beam/sdk/io/fs/MoveOptions.java  |  34 +++
 .../org/apache/beam/sdk/io/fs/ResourceId.java   |   8 +
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  10 -
 .../org/apache/beam/sdk/io/FileSystemsTest.java | 168 ++++++++++-
 .../apache/beam/sdk/io/LocalFileSystemTest.java |  12 +-
 .../beam/sdk/io/gcp/storage/GcsResourceId.java  |   2 +-
 9 files changed, 516 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index b39e517..96306dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -17,32 +17,46 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
+
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
+
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.KV;
 
 /**
  * Clients facing {@link FileSystem} utility.
@@ -50,6 +64,8 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
 public class FileSystems {
 
   public static final String DEFAULT_SCHEME = "default";
+  private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
+      "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");
 
   private static final Map<String, FileSystemRegistrar> SCHEME_TO_REGISTRAR =
       new ConcurrentHashMap<>();
@@ -63,6 +79,60 @@ public class FileSystems {
   /********************************** METHODS FOR CLIENT **********************************/
 
   /**
+   * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
+   * Callers should use {@link #match} to resolve users specs ambiguities before
+   * calling other methods.
+   *
+   * <p>Implementation handles the following ambiguities of a user-provided spec:
+   * <ol>
+   * <li>{@code spec} could be a glob or a uri. {@link #match} should be able to tell and
+   * choose efficient implementations.
+   * <li>The user-provided {@code spec} might refer to files or directories. It is common that
+   * users that wish to indicate a directory will omit the trailing path delimiter, such as
+   * {@code "/tmp/dir"} in Linux. The {@link FileSystem} should be able to recognize a directory
+   * with the trailing path delimiter omitted, but should always return a correct {@link ResourceId}
+   * (e.g., {@code "/tmp/dir/"} inside the returned {@link MatchResult}.
+   * </ol>
+   *
+   * <p>All {@link FileSystem} implementations should support glob in the final hierarchical path
+   * component of {@link ResourceId}. This allows SDK libraries to construct file system agnostic
+   * spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs.
+   *
+   * @return {@code List<MatchResult>} in the same order of the input specs.
+   *
+   * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
+   * @throws IOException if all specs failed to match due to issues like:
+   * network connection, authorization.
+   * Exception for individual spec is deferred until callers retrieve
+   * metadata with {@link MatchResult#metadata()}.
+   */
+  public static List<MatchResult> match(List<String> specs) throws IOException {
+    return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
+  }
+
+  /**
+   * Returns {@link MatchResult MatchResults} for the given {@link ResourceId resourceIds}.
+   *
+   * @param resourceIds {@link ResourceId resourceIds} that might be derived from {@link #match},
+   * {@link ResourceId#resolve}, or {@link ResourceId#getCurrentDirectory()}.
+   *
+   * @throws IOException if all {@code resourceIds} failed to match due to issues like:
+   * network connection, authorization.
+   * Exception for individual {@link ResourceId} need to be deferred until callers retrieve
+   * metadata with {@link MatchResult#metadata()}.
+   */
+  public static List<MatchResult> matchResources(List<ResourceId> resourceIds) throws IOException {
+    return match(FluentIterable
+        .from(resourceIds)
+        .transform(new Function<ResourceId, String>() {
+          @Override
+          public String apply(@Nonnull ResourceId resourceId) {
+          return resourceId.toString();
+          }})
+        .toList());
+  }
+
+  /**
    * Returns a write channel for the given {@link ResourceId}.
    *
    * <p>The resource is not expanded; it is used verbatim.
@@ -85,7 +155,7 @@ public class FileSystems {
    */
   public static WritableByteChannel create(ResourceId resourceId, CreateOptions createOptions)
       throws IOException {
-    return getFileSystemInternal(resourceId).create(resourceId, createOptions);
+    return getFileSystemInternal(resourceId.getScheme()).create(resourceId, createOptions);
   }
 
   /**
@@ -99,44 +169,206 @@ public class FileSystems {
    * @param resourceId the reference of the file-like resource to open
    */
   public static ReadableByteChannel open(ResourceId resourceId) throws IOException {
-    return getFileSystemInternal(resourceId).open(resourceId);
+    return getFileSystemInternal(resourceId.getScheme()).open(resourceId);
   }
 
-  /********************************** METHODS FOR REGISTRATION **********************************/
-
   /**
-   * Loads available {@link FileSystemRegistrar} services.
+   * Copies a {@link List} of file-like resources from one location to another.
+   *
+   * <p>The number of source resources must equal the number of destination resources.
+   * Destination resources will be created recursively.
+   *
+   * <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
+   *
+   * <p>It doesn't support copying globs.
+   *
+   * @param srcResourceIds the references of the source resources
+   * @param destResourceIds the references of the destination resources
    */
-  private static void loadFileSystemRegistrars() {
-    SCHEME_TO_REGISTRAR.clear();
-    Set<FileSystemRegistrar> registrars =
-        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
-    registrars.addAll(Lists.newArrayList(
-        ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
+  public static void copy(
+      List<ResourceId> srcResourceIds,
+      List<ResourceId> destResourceIds,
+      MoveOptions... moveOptions) throws IOException {
+    validateOnlyScheme(srcResourceIds, destResourceIds);
 
-    verifySchemesAreUnique(registrars);
+    List<ResourceId> srcToCopy;
+    List<ResourceId> destToCopy;
+    if (Sets.newHashSet(moveOptions).contains(
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
+      KV<List<ResourceId>, List<ResourceId>> existings =
+          filterMissingFiles(srcResourceIds, destResourceIds);
+      srcToCopy = existings.getKey();
+      destToCopy = existings.getValue();
+    } else {
+      srcToCopy = srcResourceIds;
+      destToCopy = destResourceIds;
+    }
+    if (srcToCopy.isEmpty()) {
+      return;
+    }
+    getFileSystemInternal(srcToCopy.iterator().next().getScheme())
+        .copy(srcToCopy, destToCopy);
+  }
 
-    for (FileSystemRegistrar registrar : registrars) {
-      SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar);
+  /**
+   * Renames a {@link List} of file-like resources from one location to another.
+   *
+   * <p>The number of source resources must equal the number of destination resources.
+   * Destination resources will be created recursively.
+   *
+   * <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
+   *
+   * <p>It doesn't support renaming globs.
+   *
+   * @param srcResourceIds the references of the source resources
+   * @param destResourceIds the references of the destination resources
+   */
+  public static void rename(
+      List<ResourceId> srcResourceIds,
+      List<ResourceId> destResourceIds,
+      MoveOptions... moveOptions) throws IOException {
+    validateOnlyScheme(srcResourceIds, destResourceIds);
+    List<ResourceId> srcToRename;
+    List<ResourceId> destToRename;
+
+    if (Sets.newHashSet(moveOptions).contains(
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
+      KV<List<ResourceId>, List<ResourceId>> existings =
+          filterMissingFiles(srcResourceIds, destResourceIds);
+      srcToRename = existings.getKey();
+      destToRename = existings.getValue();
+    } else {
+      srcToRename = srcResourceIds;
+      destToRename = destResourceIds;
+    }
+    if (srcToRename.isEmpty()) {
+      return;
     }
+    getFileSystemInternal(srcToRename.iterator().next().getScheme())
+        .rename(srcToRename, destToRename);
   }
 
   /**
-   * Sets the default configuration in workers.
+   * Deletes a collection of resources.
    *
-   * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+   * <p>It is allowed but not recommended to delete directories recursively.
+   * Callers depends on {@link FileSystems} and uses {@code DeleteOptions}.
+   *
+   * <p>{@code resourceIds} must have the same scheme.
+   *
+   * @param resourceIds the references of the resources to delete.
    */
-  public static void setDefaultConfigInWorkers(PipelineOptions options) {
-    defaultConfig = checkNotNull(options, "options");
+  public static void delete(
+      Collection<ResourceId> resourceIds, MoveOptions... moveOptions) throws IOException {
+    Collection<ResourceId> resourceIdsToDelete;
+    if (Sets.newHashSet(moveOptions).contains(
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
+      resourceIdsToDelete = FluentIterable
+          .from(matchResources(Lists.newArrayList(resourceIds)))
+          .filter(new Predicate<MatchResult>() {
+            @Override
+            public boolean apply(@Nonnull MatchResult matchResult) {
+              return !matchResult.status().equals(MatchResult.Status.NOT_FOUND);
+            }})
+          .transformAndConcat(new Function<MatchResult, Iterable<Metadata>>() {
+            @Nonnull
+            @Override
+            public Iterable<Metadata> apply(@Nonnull MatchResult input) {
+              try {
+                return Lists.newArrayList(input.metadata());
+              } catch (IOException e) {
+                throw new RuntimeException(
+                    String.format("Failed to get metadata from MatchResult: %s.", input),
+                    e);
+              }
+            }})
+          .transform(new Function<Metadata, ResourceId>() {
+            @Nonnull
+            @Override
+            public ResourceId apply(@Nonnull Metadata input) {
+              return input.resourceId();
+            }})
+          .toList();
+    } else {
+      resourceIdsToDelete = resourceIds;
+    }
+    if (resourceIdsToDelete.isEmpty()) {
+      return;
+    }
+    getFileSystemInternal(resourceIdsToDelete.iterator().next().getScheme())
+        .delete(resourceIdsToDelete);
+  }
+
+  private static KV<List<ResourceId>, List<ResourceId>> filterMissingFiles(
+      List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) throws IOException {
+    List<ResourceId> srcToHandle = new ArrayList<>();
+    List<ResourceId> destToHandle = new ArrayList<>();
+
+    List<MatchResult> matchResults = matchResources(srcResourceIds);
+    for (int i = 0; i < matchResults.size(); ++i) {
+      if (!matchResults.get(i).status().equals(Status.NOT_FOUND)) {
+        srcToHandle.add(srcResourceIds.get(i));
+        destToHandle.add(destResourceIds.get(i));
+      }
+    }
+    return KV.of(srcToHandle, destToHandle);
+  }
+
+  private static void validateOnlyScheme(
+      List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) {
+    checkArgument(
+        srcResourceIds.size() == destResourceIds.size(),
+        "Number of source resource ids %s must equal number of destination resource ids %s",
+        srcResourceIds.size(),
+        destResourceIds.size());
+    Set<String> schemes = FluentIterable.from(srcResourceIds)
+        .append(destResourceIds)
+        .transform(new Function<ResourceId, String>() {
+          @Override
+          public String apply(@Nonnull ResourceId resourceId) {
+            return resourceId.getScheme();
+          }})
+        .toSet();
+    checkArgument(
+        schemes.size() == 1,
+        String.format(
+            "Expect srcResourceIds and destResourceIds have the same scheme, but received %s.",
+            Joiner.on(", ").join(schemes)));
+  }
+
+  private static String getOnlyScheme(List<String> specs) {
+    checkArgument(!specs.isEmpty(), "Expect specs are not empty.");
+    Set<String> schemes = FluentIterable.from(specs)
+        .transform(new Function<String, String>() {
+          @Override
+          public String apply(String spec) {
+            return parseScheme(spec);
+          }})
+        .toSet();
+    return Iterables.getOnlyElement(schemes);
+  }
+
+  private static String parseScheme(String spec) {
+    // The spec is almost, but not quite, a URI. In particular,
+    // the reserved characters '[', ']', and '?' have meanings that differ
+    // from their use in the URI spec. ('*' is not reserved).
+    // Here, we just need the scheme, which is so circumscribed as to be
+    // very easy to extract with a regex.
+    Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
+
+    if (!matcher.matches()) {
+      return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
+    } else {
+      return matcher.group("scheme").toLowerCase();
+    }
   }
 
   /**
    * Internal method to get {@link FileSystem} for {@code spec}.
    */
   @VisibleForTesting
-  static FileSystem getFileSystemInternal(ResourceId resourceId) {
-    String lowerCaseScheme = resourceId.getScheme().toLowerCase();
-    return getRegistrarInternal(lowerCaseScheme).fromOptions(defaultConfig);
+  static FileSystem getFileSystemInternal(String scheme) {
+    return getRegistrarInternal(scheme.toLowerCase()).fromOptions(defaultConfig);
   }
 
   /**
@@ -154,6 +386,34 @@ public class FileSystems {
     }
   }
 
+  /********************************** METHODS FOR REGISTRATION **********************************/
+
+  /**
+   * Loads available {@link FileSystemRegistrar} services.
+   */
+  private static void loadFileSystemRegistrars() {
+    SCHEME_TO_REGISTRAR.clear();
+    Set<FileSystemRegistrar> registrars =
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
+    registrars.addAll(Lists.newArrayList(
+        ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
+
+    verifySchemesAreUnique(registrars);
+
+    for (FileSystemRegistrar registrar : registrars) {
+      SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar);
+    }
+  }
+
+  /**
+   * Sets the default configuration in workers.
+   *
+   * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+   */
+  public static void setDefaultConfigInWorkers(PipelineOptions options) {
+    defaultConfig = checkNotNull(options, "options");
+  }
+
   @VisibleForTesting
   static void verifySchemesAreUnique(Set<FileSystemRegistrar> registrars) {
     Multimap<String, FileSystemRegistrar> registrarsBySchemes =

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index fe6b643..1cad4b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -27,13 +27,13 @@ import com.google.common.collect.Lists;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
@@ -107,19 +107,13 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
       LocalResourceId src = srcResourceIds.get(i);
       LocalResourceId dst = destResourceIds.get(i);
       LOG.debug("Copying {} to {}", src, dst);
-      try {
-        // Copy the source file, replacing the existing destination.
-        // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
-        Files.copy(
-            src.getPath(),
-            dst.getPath(),
-            StandardCopyOption.REPLACE_EXISTING,
-            StandardCopyOption.COPY_ATTRIBUTES);
-      } catch (NoSuchFileException e) {
-        LOG.debug("{} does not exist.", src);
-        // Suppress exception if file does not exist.
-        // TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile.
-      }
+      // Copy the source file, replacing the existing destination.
+      // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
+      Files.copy(
+          src.getPath(),
+          dst.getPath(),
+          StandardCopyOption.REPLACE_EXISTING,
+          StandardCopyOption.COPY_ATTRIBUTES);
     }
   }
 
@@ -137,19 +131,12 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
       LocalResourceId src = srcResourceIds.get(i);
       LocalResourceId dst = destResourceIds.get(i);
       LOG.debug("Renaming {} to {}", src, dst);
-      try {
-        // Rename the source file, replacing the existing destination.
-        Files.move(
-            src.getPath(),
-            dst.getPath(),
-            StandardCopyOption.REPLACE_EXISTING,
-            StandardCopyOption.COPY_ATTRIBUTES,
-            StandardCopyOption.ATOMIC_MOVE);
-      } catch (NoSuchFileException e) {
-        LOG.debug("{} does not exist.", src);
-        // Suppress exception if file does not exist.
-        // TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile.
-      }
+      // Rename the source file, replacing the existing destination.
+      Files.move(
+          src.getPath(),
+          dst.getPath(),
+          StandardCopyOption.REPLACE_EXISTING,
+          StandardCopyOption.ATOMIC_MOVE);
     }
   }
 
@@ -157,12 +144,7 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
   protected void delete(Collection<LocalResourceId> resourceIds) throws IOException {
     for (LocalResourceId resourceId : resourceIds) {
       LOG.debug("deleting file {}", resourceId);
-      // Delete the file if it exists.
-      // TODO: use Files.delete() once FileSystems supports ignoreMissingFile.
-      boolean exists = Files.deleteIfExists(resourceId.getPath());
-      if (!exists) {
-        LOG.debug("Tried to delete {}, but it did not exist", resourceId);
-      }
+      Files.delete(resourceId.getPath());
     }
   }
 
@@ -207,7 +189,14 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
     for (File match : matchedFiles) {
       result.add(toMetadata(match));
     }
-    return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()]));
+    if (result.isEmpty()) {
+      // TODO: consider to return Status.OK for globs.
+      return MatchResult.create(
+          Status.NOT_FOUND,
+          new FileNotFoundException(String.format("No files found for spec: %s.", spec)));
+    } else {
+      return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()]));
+    }
   }
 
   private Metadata toMetadata(File file) {

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
index 3f4dab2..2272a06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
@@ -116,7 +116,7 @@ class LocalResourceId implements ResourceId {
 
   @Override
   public String toString() {
-    return String.format("LocalResourceId: [%s]", path);
+    return path.toString();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
new file mode 100644
index 0000000..c5bd27e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import org.apache.beam.sdk.io.FileSystems;
+
+/**
+ * An object that configures {@link FileSystems#copy}, {@link FileSystems#rename},
+ * and {@link FileSystems#delete}.
+ */
+public interface MoveOptions {
+
+  /**
+   * Defines the standard {@link MoveOptions}.
+   */
+  enum StandardMoveOptions implements MoveOptions {
+    IGNORE_MISSING_FILES,
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index 2bdd660..938e24a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.fs;
 
+import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 
 /**
@@ -82,4 +83,11 @@ public interface ResourceId {
    * <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
    */
   String getScheme();
+
+  /**
+   * Returns the string representation of this {@link ResourceId}.
+   *
+   * <p>The corresponding {@link FileSystem#match} is required to accept this string representation.
+   */
+  String toString();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 434baf5..14781c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -752,11 +752,6 @@ public class GcsUtil {
 
       @Override
       public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-        if (errorExtractor.itemNotFound(e)) {
-          // Do nothing on item not found.
-          LOG.debug("{} does not exist, assuming this is a retry after deletion.", from);
-          return;
-        }
         throw new IOException(
             String.format("Error trying to copy %s to %s: %s", from, to, e));
       }
@@ -774,11 +769,6 @@ public class GcsUtil {
 
       @Override
       public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-        if (errorExtractor.itemNotFound(e)) {
-          // Do nothing on item not found.
-          LOG.debug("{} does not exist.", file);
-          return;
-        }
         throw new IOException(String.format("Error trying to delete %s: %s", file, e));
       }
     });

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
index cfa2b85..8cfa3dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
@@ -17,16 +17,36 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+import java.io.Writer;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.List;
 import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.commons.lang3.SystemUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -37,17 +57,20 @@ import org.junit.runners.JUnit4;
 public class FileSystemsTest {
 
   @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  @Rule
   public ExpectedException thrown = ExpectedException.none();
+  private LocalFileSystem localFileSystem = new LocalFileSystem();
 
   @Test
   public void testGetLocalFileSystem() throws Exception {
-    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/"))
+    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/").getScheme())
         instanceof LocalFileSystem);
-    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home"))
+    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home").getScheme())
         instanceof LocalFileSystem);
-    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home"))
+    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home").getScheme())
         instanceof LocalFileSystem);
-    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home"))
+    assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home").getScheme())
         instanceof LocalFileSystem);
   }
 
@@ -71,6 +94,143 @@ public class FileSystemsTest {
             }));
   }
 
+  @Test
+  public void testDeleteThrowsNoSuchFileException() throws Exception {
+    Path existingPath = temporaryFolder.newFile().toPath();
+    Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+    createFileWithContent(existingPath, "content1");
+
+    thrown.expect(NoSuchFileException.class);
+    FileSystems.delete(
+        toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */));
+  }
+
+  @Test
+  public void testDeleteIgnoreMissingFiles() throws Exception {
+    Path existingPath = temporaryFolder.newFile().toPath();
+    Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+    createFileWithContent(existingPath, "content1");
+
+    FileSystems.delete(
+        toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+  }
+
+  @Test
+  public void testCopyThrowsNoSuchFileException() throws Exception {
+    Path existingPath = temporaryFolder.newFile().toPath();
+    Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+    Path destPath1 = existingPath.resolveSibling("dest1");
+    Path destPath2 = nonExistentPath.resolveSibling("dest2");
+
+    createFileWithContent(existingPath, "content1");
+
+    thrown.expect(NoSuchFileException.class);
+    FileSystems.copy(
+        toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
+        toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */));
+  }
+
+  @Test
+  public void testCopyIgnoreMissingFiles() throws Exception {
+    Path srcPath1 = temporaryFolder.newFile().toPath();
+    Path nonExistentPath = srcPath1.resolveSibling("non-existent");
+    Path srcPath3 = temporaryFolder.newFile().toPath();
+
+    Path destPath1 = srcPath1.resolveSibling("dest1");
+    Path destPath2 = nonExistentPath.resolveSibling("dest2");
+    Path destPath3 = srcPath1.resolveSibling("dest3");
+
+    createFileWithContent(srcPath1, "content1");
+    createFileWithContent(srcPath3, "content3");
+
+    FileSystems.copy(
+        toResourceIds(
+            ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */),
+        toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */),
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+    assertTrue(srcPath1.toFile().exists());
+    assertTrue(srcPath3.toFile().exists());
+    assertThat(
+        Files.readLines(srcPath1.toFile(), StandardCharsets.UTF_8),
+        containsInAnyOrder("content1"));
+    assertFalse(destPath2.toFile().exists());
+    assertThat(
+        Files.readLines(srcPath3.toFile(), StandardCharsets.UTF_8),
+        containsInAnyOrder("content3"));
+  }
+
+  @Test
+  public void testRenameThrowsNoSuchFileException() throws Exception {
+    Path existingPath = temporaryFolder.newFile().toPath();
+    Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+    Path destPath1 = existingPath.resolveSibling("dest1");
+    Path destPath2 = nonExistentPath.resolveSibling("dest2");
+
+    createFileWithContent(existingPath, "content1");
+
+    thrown.expect(NoSuchFileException.class);
+    FileSystems.rename(
+        toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
+        toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */));
+  }
+
+  @Test
+  public void testRenameIgnoreMissingFiles() throws Exception {
+    Path srcPath1 = temporaryFolder.newFile().toPath();
+    Path nonExistentPath = srcPath1.resolveSibling("non-existent");
+    Path srcPath3 = temporaryFolder.newFile().toPath();
+
+    Path destPath1 = srcPath1.resolveSibling("dest1");
+    Path destPath2 = nonExistentPath.resolveSibling("dest2");
+    Path destPath3 = srcPath1.resolveSibling("dest3");
+
+    createFileWithContent(srcPath1, "content1");
+    createFileWithContent(srcPath3, "content3");
+
+    FileSystems.rename(
+        toResourceIds(
+            ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */),
+        toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */),
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+    assertFalse(srcPath1.toFile().exists());
+    assertFalse(srcPath3.toFile().exists());
+    assertThat(
+        Files.readLines(destPath1.toFile(), StandardCharsets.UTF_8),
+        containsInAnyOrder("content1"));
+    assertFalse(destPath2.toFile().exists());
+    assertThat(
+        Files.readLines(destPath3.toFile(), StandardCharsets.UTF_8),
+        containsInAnyOrder("content3"));
+  }
+
+  private List<ResourceId> toResourceIds(List<Path> paths, final boolean isDirectory) {
+    return FluentIterable
+        .from(paths)
+        .transform(new Function<Path, ResourceId>() {
+          @Override
+          public ResourceId apply(Path path) {
+            return LocalResourceId.fromPath(path, isDirectory);
+          }})
+        .toList();
+  }
+
+  private void createFileWithContent(Path path, String content) throws Exception {
+    try (Writer writer = Channels.newWriter(
+        localFileSystem.create(
+            LocalResourceId.fromPath(path, false /* isDirectory */),
+            CreateOptions.StandardCreateOptions.builder().setMimeType(MimeTypes.TEXT).build()),
+        StandardCharsets.UTF_8.name())) {
+      writer.write(content);
+    }
+  }
+
   private LocalResourceId toLocalResourceId(String str) throws Exception {
     boolean isDirectory;
     if (SystemUtils.IS_OS_WINDOWS) {

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
index 74f8b72..bb5928e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
@@ -168,28 +168,24 @@ public class LocalFileSystemTest {
 
   @Test
   public void testMatchPatternNone() throws Exception {
-    List<String> expected = ImmutableList.of();
     temporaryFolder.newFile("a");
     temporaryFolder.newFile("aa");
     temporaryFolder.newFile("ab");
 
     List<MatchResult> matchResults =
         matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("b"), "*");
-    assertThat(
-        toFilenames(matchResults),
-        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+    assertEquals(1, matchResults.size());
+    assertEquals(MatchResult.Status.NOT_FOUND, matchResults.get(0).status());
   }
 
   @Test
   public void testMatchForNonExistentFile() throws Exception {
-    List<String> expected = ImmutableList.of();
     temporaryFolder.newFile("aa");
 
     List<MatchResult> matchResults = localFileSystem.match(
         ImmutableList.of(temporaryFolder.getRoot().toPath().resolve("a").toString()));
-    assertThat(
-        toFilenames(matchResults),
-        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+    assertEquals(1, matchResults.size());
+    assertEquals(MatchResult.Status.NOT_FOUND, matchResults.get(0).status());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
index aecc5c9..a1ac827 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
@@ -97,7 +97,7 @@ public class GcsResourceId implements ResourceId {
 
   @Override
   public String toString() {
-    return String.format("GcsResourceId: [%s]", gcsPath);
+    return gcsPath.toString();
   }
 
   @Override


[2/2] beam git commit: This closes #2175

Posted by pe...@apache.org.
This closes #2175


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/769398e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/769398e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/769398e4

Branch: refs/heads/master
Commit: 769398e403b9632562544ca752ee528acfceec5a
Parents: 5363431 d07ef53
Author: Pei He <pe...@gmail.com>
Authored: Thu Mar 30 19:25:09 2017 +0800
Committer: Pei He <pe...@gmail.com>
Committed: Thu Mar 30 19:25:09 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileSystems.java     | 302 +++++++++++++++++--
 .../org/apache/beam/sdk/io/LocalFileSystem.java |  57 ++--
 .../org/apache/beam/sdk/io/LocalResourceId.java |   2 +-
 .../org/apache/beam/sdk/io/fs/MoveOptions.java  |  34 +++
 .../org/apache/beam/sdk/io/fs/ResourceId.java   |   8 +
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  10 -
 .../org/apache/beam/sdk/io/FileSystemsTest.java | 168 ++++++++++-
 .../apache/beam/sdk/io/LocalFileSystemTest.java |  12 +-
 .../beam/sdk/io/gcp/storage/GcsResourceId.java  |   2 +-
 9 files changed, 516 insertions(+), 79 deletions(-)
----------------------------------------------------------------------