You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/03/30 11:30:54 UTC
[1/2] beam git commit: [BEAM-59] Beam FileSystems: add match(), copy(),
rename(), delete() utilities.
Repository: beam
Updated Branches:
refs/heads/master 536343142 -> 769398e40
[BEAM-59] Beam FileSystems: add match(), copy(), rename(), delete() utilities.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d07ef530
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d07ef530
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d07ef530
Branch: refs/heads/master
Commit: d07ef530dac17e9064cd0003470163ccbef7dfef
Parents: 5363431
Author: Pei He <pe...@gmail.com>
Authored: Fri Mar 10 14:15:18 2017 +0800
Committer: Pei He <pe...@gmail.com>
Committed: Thu Mar 30 19:23:56 2017 +0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileSystems.java | 302 +++++++++++++++++--
.../org/apache/beam/sdk/io/LocalFileSystem.java | 57 ++--
.../org/apache/beam/sdk/io/LocalResourceId.java | 2 +-
.../org/apache/beam/sdk/io/fs/MoveOptions.java | 34 +++
.../org/apache/beam/sdk/io/fs/ResourceId.java | 8 +
.../java/org/apache/beam/sdk/util/GcsUtil.java | 10 -
.../org/apache/beam/sdk/io/FileSystemsTest.java | 168 ++++++++++-
.../apache/beam/sdk/io/LocalFileSystemTest.java | 12 +-
.../beam/sdk/io/gcp/storage/GcsResourceId.java | 2 +-
9 files changed, 516 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index b39e517..96306dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -17,32 +17,46 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
+
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javax.annotation.Nonnull;
+
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.KV;
/**
* Clients facing {@link FileSystem} utility.
@@ -50,6 +64,8 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
public class FileSystems {
public static final String DEFAULT_SCHEME = "default";
+ private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
+ "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");
private static final Map<String, FileSystemRegistrar> SCHEME_TO_REGISTRAR =
new ConcurrentHashMap<>();
@@ -63,6 +79,60 @@ public class FileSystems {
/********************************** METHODS FOR CLIENT **********************************/
/**
+ * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
+ * Callers should use {@link #match} to resolve users specs ambiguities before
+ * calling other methods.
+ *
+ * <p>Implementation handles the following ambiguities of a user-provided spec:
+ * <ol>
+ * <li>{@code spec} could be a glob or a uri. {@link #match} should be able to tell and
+ * choose efficient implementations.
+ * <li>The user-provided {@code spec} might refer to files or directories. It is common that
+ * users that wish to indicate a directory will omit the trailing path delimiter, such as
+ * {@code "/tmp/dir"} in Linux. The {@link FileSystem} should be able to recognize a directory
+ * with the trailing path delimiter omitted, but should always return a correct {@link ResourceId}
+ * (e.g., {@code "/tmp/dir/"} inside the returned {@link MatchResult}.
+ * </ol>
+ *
+ * <p>All {@link FileSystem} implementations should support glob in the final hierarchical path
+ * component of {@link ResourceId}. This allows SDK libraries to construct file system agnostic
+ * spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs.
+ *
+ * @return {@code List<MatchResult>} in the same order of the input specs.
+ *
+ * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
+ * @throws IOException if all specs failed to match due to issues like:
+ * network connection, authorization.
+ * Exception for individual spec is deferred until callers retrieve
+ * metadata with {@link MatchResult#metadata()}.
+ */
+ public static List<MatchResult> match(List<String> specs) throws IOException {
+ return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
+ }
+
+ /**
+ * Returns {@link MatchResult MatchResults} for the given {@link ResourceId resourceIds}.
+ *
+ * @param resourceIds {@link ResourceId resourceIds} that might be derived from {@link #match},
+ * {@link ResourceId#resolve}, or {@link ResourceId#getCurrentDirectory()}.
+ *
+ * @throws IOException if all {@code resourceIds} failed to match due to issues like:
+ * network connection, authorization.
+ * Exception for individual {@link ResourceId} need to be deferred until callers retrieve
+ * metadata with {@link MatchResult#metadata()}.
+ */
+ public static List<MatchResult> matchResources(List<ResourceId> resourceIds) throws IOException {
+ return match(FluentIterable
+ .from(resourceIds)
+ .transform(new Function<ResourceId, String>() {
+ @Override
+ public String apply(@Nonnull ResourceId resourceId) {
+ return resourceId.toString();
+ }})
+ .toList());
+ }
+
+ /**
* Returns a write channel for the given {@link ResourceId}.
*
* <p>The resource is not expanded; it is used verbatim.
@@ -85,7 +155,7 @@ public class FileSystems {
*/
public static WritableByteChannel create(ResourceId resourceId, CreateOptions createOptions)
throws IOException {
- return getFileSystemInternal(resourceId).create(resourceId, createOptions);
+ return getFileSystemInternal(resourceId.getScheme()).create(resourceId, createOptions);
}
/**
@@ -99,44 +169,206 @@ public class FileSystems {
* @param resourceId the reference of the file-like resource to open
*/
public static ReadableByteChannel open(ResourceId resourceId) throws IOException {
- return getFileSystemInternal(resourceId).open(resourceId);
+ return getFileSystemInternal(resourceId.getScheme()).open(resourceId);
}
- /********************************** METHODS FOR REGISTRATION **********************************/
-
/**
- * Loads available {@link FileSystemRegistrar} services.
+ * Copies a {@link List} of file-like resources from one location to another.
+ *
+ * <p>The number of source resources must equal the number of destination resources.
+ * Destination resources will be created recursively.
+ *
+ * <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
+ *
+ * <p>It doesn't support copying globs.
+ *
+ * @param srcResourceIds the references of the source resources
+ * @param destResourceIds the references of the destination resources
*/
- private static void loadFileSystemRegistrars() {
- SCHEME_TO_REGISTRAR.clear();
- Set<FileSystemRegistrar> registrars =
- Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
- registrars.addAll(Lists.newArrayList(
- ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
+ public static void copy(
+ List<ResourceId> srcResourceIds,
+ List<ResourceId> destResourceIds,
+ MoveOptions... moveOptions) throws IOException {
+ validateOnlyScheme(srcResourceIds, destResourceIds);
- verifySchemesAreUnique(registrars);
+ List<ResourceId> srcToCopy;
+ List<ResourceId> destToCopy;
+ if (Sets.newHashSet(moveOptions).contains(
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
+ KV<List<ResourceId>, List<ResourceId>> existings =
+ filterMissingFiles(srcResourceIds, destResourceIds);
+ srcToCopy = existings.getKey();
+ destToCopy = existings.getValue();
+ } else {
+ srcToCopy = srcResourceIds;
+ destToCopy = destResourceIds;
+ }
+ if (srcToCopy.isEmpty()) {
+ return;
+ }
+ getFileSystemInternal(srcToCopy.iterator().next().getScheme())
+ .copy(srcToCopy, destToCopy);
+ }
- for (FileSystemRegistrar registrar : registrars) {
- SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar);
+ /**
+ * Renames a {@link List} of file-like resources from one location to another.
+ *
+ * <p>The number of source resources must equal the number of destination resources.
+ * Destination resources will be created recursively.
+ *
+ * <p>{@code srcResourceIds} and {@code destResourceIds} must have the same scheme.
+ *
+ * <p>It doesn't support renaming globs.
+ *
+ * @param srcResourceIds the references of the source resources
+ * @param destResourceIds the references of the destination resources
+ */
+ public static void rename(
+ List<ResourceId> srcResourceIds,
+ List<ResourceId> destResourceIds,
+ MoveOptions... moveOptions) throws IOException {
+ validateOnlyScheme(srcResourceIds, destResourceIds);
+ List<ResourceId> srcToRename;
+ List<ResourceId> destToRename;
+
+ if (Sets.newHashSet(moveOptions).contains(
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
+ KV<List<ResourceId>, List<ResourceId>> existings =
+ filterMissingFiles(srcResourceIds, destResourceIds);
+ srcToRename = existings.getKey();
+ destToRename = existings.getValue();
+ } else {
+ srcToRename = srcResourceIds;
+ destToRename = destResourceIds;
+ }
+ if (srcToRename.isEmpty()) {
+ return;
}
+ getFileSystemInternal(srcToRename.iterator().next().getScheme())
+ .rename(srcToRename, destToRename);
}
/**
- * Sets the default configuration in workers.
+ * Deletes a collection of resources.
*
- * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+ * <p>It is allowed but not recommended to delete directories recursively.
+ * Callers depends on {@link FileSystems} and uses {@code DeleteOptions}.
+ *
+ * <p>{@code resourceIds} must have the same scheme.
+ *
+ * @param resourceIds the references of the resources to delete.
*/
- public static void setDefaultConfigInWorkers(PipelineOptions options) {
- defaultConfig = checkNotNull(options, "options");
+ public static void delete(
+ Collection<ResourceId> resourceIds, MoveOptions... moveOptions) throws IOException {
+ Collection<ResourceId> resourceIdsToDelete;
+ if (Sets.newHashSet(moveOptions).contains(
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
+ resourceIdsToDelete = FluentIterable
+ .from(matchResources(Lists.newArrayList(resourceIds)))
+ .filter(new Predicate<MatchResult>() {
+ @Override
+ public boolean apply(@Nonnull MatchResult matchResult) {
+ return !matchResult.status().equals(MatchResult.Status.NOT_FOUND);
+ }})
+ .transformAndConcat(new Function<MatchResult, Iterable<Metadata>>() {
+ @Nonnull
+ @Override
+ public Iterable<Metadata> apply(@Nonnull MatchResult input) {
+ try {
+ return Lists.newArrayList(input.metadata());
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to get metadata from MatchResult: %s.", input),
+ e);
+ }
+ }})
+ .transform(new Function<Metadata, ResourceId>() {
+ @Nonnull
+ @Override
+ public ResourceId apply(@Nonnull Metadata input) {
+ return input.resourceId();
+ }})
+ .toList();
+ } else {
+ resourceIdsToDelete = resourceIds;
+ }
+ if (resourceIdsToDelete.isEmpty()) {
+ return;
+ }
+ getFileSystemInternal(resourceIdsToDelete.iterator().next().getScheme())
+ .delete(resourceIdsToDelete);
+ }
+
+ private static KV<List<ResourceId>, List<ResourceId>> filterMissingFiles(
+ List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) throws IOException {
+ List<ResourceId> srcToHandle = new ArrayList<>();
+ List<ResourceId> destToHandle = new ArrayList<>();
+
+ List<MatchResult> matchResults = matchResources(srcResourceIds);
+ for (int i = 0; i < matchResults.size(); ++i) {
+ if (!matchResults.get(i).status().equals(Status.NOT_FOUND)) {
+ srcToHandle.add(srcResourceIds.get(i));
+ destToHandle.add(destResourceIds.get(i));
+ }
+ }
+ return KV.of(srcToHandle, destToHandle);
+ }
+
+ private static void validateOnlyScheme(
+ List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) {
+ checkArgument(
+ srcResourceIds.size() == destResourceIds.size(),
+ "Number of source resource ids %s must equal number of destination resource ids %s",
+ srcResourceIds.size(),
+ destResourceIds.size());
+ Set<String> schemes = FluentIterable.from(srcResourceIds)
+ .append(destResourceIds)
+ .transform(new Function<ResourceId, String>() {
+ @Override
+ public String apply(@Nonnull ResourceId resourceId) {
+ return resourceId.getScheme();
+ }})
+ .toSet();
+ checkArgument(
+ schemes.size() == 1,
+ String.format(
+ "Expect srcResourceIds and destResourceIds have the same scheme, but received %s.",
+ Joiner.on(", ").join(schemes)));
+ }
+
+ private static String getOnlyScheme(List<String> specs) {
+ checkArgument(!specs.isEmpty(), "Expect specs are not empty.");
+ Set<String> schemes = FluentIterable.from(specs)
+ .transform(new Function<String, String>() {
+ @Override
+ public String apply(String spec) {
+ return parseScheme(spec);
+ }})
+ .toSet();
+ return Iterables.getOnlyElement(schemes);
+ }
+
+ private static String parseScheme(String spec) {
+ // The spec is almost, but not quite, a URI. In particular,
+ // the reserved characters '[', ']', and '?' have meanings that differ
+ // from their use in the URI spec. ('*' is not reserved).
+ // Here, we just need the scheme, which is so circumscribed as to be
+ // very easy to extract with a regex.
+ Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
+
+ if (!matcher.matches()) {
+ return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
+ } else {
+ return matcher.group("scheme").toLowerCase();
+ }
}
/**
* Internal method to get {@link FileSystem} for {@code spec}.
*/
@VisibleForTesting
- static FileSystem getFileSystemInternal(ResourceId resourceId) {
- String lowerCaseScheme = resourceId.getScheme().toLowerCase();
- return getRegistrarInternal(lowerCaseScheme).fromOptions(defaultConfig);
+ static FileSystem getFileSystemInternal(String scheme) {
+ return getRegistrarInternal(scheme.toLowerCase()).fromOptions(defaultConfig);
}
/**
@@ -154,6 +386,34 @@ public class FileSystems {
}
}
+ /********************************** METHODS FOR REGISTRATION **********************************/
+
+ /**
+ * Loads available {@link FileSystemRegistrar} services.
+ */
+ private static void loadFileSystemRegistrars() {
+ SCHEME_TO_REGISTRAR.clear();
+ Set<FileSystemRegistrar> registrars =
+ Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
+ registrars.addAll(Lists.newArrayList(
+ ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
+
+ verifySchemesAreUnique(registrars);
+
+ for (FileSystemRegistrar registrar : registrars) {
+ SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar);
+ }
+ }
+
+ /**
+ * Sets the default configuration in workers.
+ *
+ * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+ */
+ public static void setDefaultConfigInWorkers(PipelineOptions options) {
+ defaultConfig = checkNotNull(options, "options");
+ }
+
@VisibleForTesting
static void verifySchemesAreUnique(Set<FileSystemRegistrar> registrars) {
Multimap<String, FileSystemRegistrar> registrarsBySchemes =
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index fe6b643..1cad4b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -27,13 +27,13 @@ import com.google.common.collect.Lists;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
@@ -107,19 +107,13 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
LocalResourceId src = srcResourceIds.get(i);
LocalResourceId dst = destResourceIds.get(i);
LOG.debug("Copying {} to {}", src, dst);
- try {
- // Copy the source file, replacing the existing destination.
- // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
- Files.copy(
- src.getPath(),
- dst.getPath(),
- StandardCopyOption.REPLACE_EXISTING,
- StandardCopyOption.COPY_ATTRIBUTES);
- } catch (NoSuchFileException e) {
- LOG.debug("{} does not exist.", src);
- // Suppress exception if file does not exist.
- // TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile.
- }
+ // Copy the source file, replacing the existing destination.
+ // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
+ Files.copy(
+ src.getPath(),
+ dst.getPath(),
+ StandardCopyOption.REPLACE_EXISTING,
+ StandardCopyOption.COPY_ATTRIBUTES);
}
}
@@ -137,19 +131,12 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
LocalResourceId src = srcResourceIds.get(i);
LocalResourceId dst = destResourceIds.get(i);
LOG.debug("Renaming {} to {}", src, dst);
- try {
- // Rename the source file, replacing the existing destination.
- Files.move(
- src.getPath(),
- dst.getPath(),
- StandardCopyOption.REPLACE_EXISTING,
- StandardCopyOption.COPY_ATTRIBUTES,
- StandardCopyOption.ATOMIC_MOVE);
- } catch (NoSuchFileException e) {
- LOG.debug("{} does not exist.", src);
- // Suppress exception if file does not exist.
- // TODO: re-throw FileNotFoundException once FileSystems supports ignoreMissingFile.
- }
+ // Rename the source file, replacing the existing destination.
+ Files.move(
+ src.getPath(),
+ dst.getPath(),
+ StandardCopyOption.REPLACE_EXISTING,
+ StandardCopyOption.ATOMIC_MOVE);
}
}
@@ -157,12 +144,7 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
protected void delete(Collection<LocalResourceId> resourceIds) throws IOException {
for (LocalResourceId resourceId : resourceIds) {
LOG.debug("deleting file {}", resourceId);
- // Delete the file if it exists.
- // TODO: use Files.delete() once FileSystems supports ignoreMissingFile.
- boolean exists = Files.deleteIfExists(resourceId.getPath());
- if (!exists) {
- LOG.debug("Tried to delete {}, but it did not exist", resourceId);
- }
+ Files.delete(resourceId.getPath());
}
}
@@ -207,7 +189,14 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
for (File match : matchedFiles) {
result.add(toMetadata(match));
}
- return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()]));
+ if (result.isEmpty()) {
+ // TODO: consider to return Status.OK for globs.
+ return MatchResult.create(
+ Status.NOT_FOUND,
+ new FileNotFoundException(String.format("No files found for spec: %s.", spec)));
+ } else {
+ return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()]));
+ }
}
private Metadata toMetadata(File file) {
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
index 3f4dab2..2272a06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
@@ -116,7 +116,7 @@ class LocalResourceId implements ResourceId {
@Override
public String toString() {
- return String.format("LocalResourceId: [%s]", path);
+ return path.toString();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
new file mode 100644
index 0000000..c5bd27e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import org.apache.beam.sdk.io.FileSystems;
+
+/**
+ * An object that configures {@link FileSystems#copy}, {@link FileSystems#rename},
+ * and {@link FileSystems#delete}.
+ */
+public interface MoveOptions {
+
+ /**
+ * Defines the standard {@link MoveOptions}.
+ */
+ enum StandardMoveOptions implements MoveOptions {
+ IGNORE_MISSING_FILES,
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index 2bdd660..938e24a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.fs;
+import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
/**
@@ -82,4 +83,11 @@ public interface ResourceId {
* <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
String getScheme();
+
+ /**
+ * Returns the string representation of this {@link ResourceId}.
+ *
+ * <p>The corresponding {@link FileSystem#match} is required to accept this string representation.
+ */
+ String toString();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 434baf5..14781c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -752,11 +752,6 @@ public class GcsUtil {
@Override
public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
- if (errorExtractor.itemNotFound(e)) {
- // Do nothing on item not found.
- LOG.debug("{} does not exist, assuming this is a retry after deletion.", from);
- return;
- }
throw new IOException(
String.format("Error trying to copy %s to %s: %s", from, to, e));
}
@@ -774,11 +769,6 @@ public class GcsUtil {
@Override
public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
- if (errorExtractor.itemNotFound(e)) {
- // Do nothing on item not found.
- LOG.debug("{} does not exist.", file);
- return;
- }
throw new IOException(String.format("Error trying to delete %s: %s", file, e));
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
index cfa2b85..8cfa3dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
@@ -17,16 +17,36 @@
*/
package org.apache.beam.sdk.io;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+import java.io.Writer;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.List;
import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.MimeTypes;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -37,17 +57,20 @@ import org.junit.runners.JUnit4;
public class FileSystemsTest {
@Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
public ExpectedException thrown = ExpectedException.none();
+ private LocalFileSystem localFileSystem = new LocalFileSystem();
@Test
public void testGetLocalFileSystem() throws Exception {
- assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/"))
+ assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("~/home/").getScheme())
instanceof LocalFileSystem);
- assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home"))
+ assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("file://home").getScheme())
instanceof LocalFileSystem);
- assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home"))
+ assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("FILE://home").getScheme())
instanceof LocalFileSystem);
- assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home"))
+ assertTrue(FileSystems.getFileSystemInternal(toLocalResourceId("File://home").getScheme())
instanceof LocalFileSystem);
}
@@ -71,6 +94,143 @@ public class FileSystemsTest {
}));
}
+ @Test
+ public void testDeleteThrowsNoSuchFileException() throws Exception {
+ Path existingPath = temporaryFolder.newFile().toPath();
+ Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+ createFileWithContent(existingPath, "content1");
+
+ thrown.expect(NoSuchFileException.class);
+ FileSystems.delete(
+ toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */));
+ }
+
+ @Test
+ public void testDeleteIgnoreMissingFiles() throws Exception {
+ Path existingPath = temporaryFolder.newFile().toPath();
+ Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+ createFileWithContent(existingPath, "content1");
+
+ FileSystems.delete(
+ toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+ }
+
+ @Test
+ public void testCopyThrowsNoSuchFileException() throws Exception {
+ Path existingPath = temporaryFolder.newFile().toPath();
+ Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+ Path destPath1 = existingPath.resolveSibling("dest1");
+ Path destPath2 = nonExistentPath.resolveSibling("dest2");
+
+ createFileWithContent(existingPath, "content1");
+
+ thrown.expect(NoSuchFileException.class);
+ FileSystems.copy(
+ toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
+ toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */));
+ }
+
+ @Test
+ public void testCopyIgnoreMissingFiles() throws Exception {
+ Path srcPath1 = temporaryFolder.newFile().toPath();
+ Path nonExistentPath = srcPath1.resolveSibling("non-existent");
+ Path srcPath3 = temporaryFolder.newFile().toPath();
+
+ Path destPath1 = srcPath1.resolveSibling("dest1");
+ Path destPath2 = nonExistentPath.resolveSibling("dest2");
+ Path destPath3 = srcPath1.resolveSibling("dest3");
+
+ createFileWithContent(srcPath1, "content1");
+ createFileWithContent(srcPath3, "content3");
+
+ FileSystems.copy(
+ toResourceIds(
+ ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */),
+ toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */),
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+ assertTrue(srcPath1.toFile().exists());
+ assertTrue(srcPath3.toFile().exists());
+ assertThat(
+ Files.readLines(srcPath1.toFile(), StandardCharsets.UTF_8),
+ containsInAnyOrder("content1"));
+ assertFalse(destPath2.toFile().exists());
+ assertThat(
+ Files.readLines(srcPath3.toFile(), StandardCharsets.UTF_8),
+ containsInAnyOrder("content3"));
+ }
+
+ @Test
+ public void testRenameThrowsNoSuchFileException() throws Exception {
+ Path existingPath = temporaryFolder.newFile().toPath();
+ Path nonExistentPath = existingPath.resolveSibling("non-existent");
+
+ Path destPath1 = existingPath.resolveSibling("dest1");
+ Path destPath2 = nonExistentPath.resolveSibling("dest2");
+
+ createFileWithContent(existingPath, "content1");
+
+ thrown.expect(NoSuchFileException.class);
+ FileSystems.rename(
+ toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */),
+ toResourceIds(ImmutableList.of(destPath1, destPath2), false /* isDirectory */));
+ }
+
+ @Test
+ public void testRenameIgnoreMissingFiles() throws Exception {
+ Path srcPath1 = temporaryFolder.newFile().toPath();
+ Path nonExistentPath = srcPath1.resolveSibling("non-existent");
+ Path srcPath3 = temporaryFolder.newFile().toPath();
+
+ Path destPath1 = srcPath1.resolveSibling("dest1");
+ Path destPath2 = nonExistentPath.resolveSibling("dest2");
+ Path destPath3 = srcPath1.resolveSibling("dest3");
+
+ createFileWithContent(srcPath1, "content1");
+ createFileWithContent(srcPath3, "content3");
+
+ FileSystems.rename(
+ toResourceIds(
+ ImmutableList.of(srcPath1, nonExistentPath, srcPath3), false /* isDirectory */),
+ toResourceIds(ImmutableList.of(destPath1, destPath2, destPath3), false /* isDirectory */),
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+ assertFalse(srcPath1.toFile().exists());
+ assertFalse(srcPath3.toFile().exists());
+ assertThat(
+ Files.readLines(destPath1.toFile(), StandardCharsets.UTF_8),
+ containsInAnyOrder("content1"));
+ assertFalse(destPath2.toFile().exists());
+ assertThat(
+ Files.readLines(destPath3.toFile(), StandardCharsets.UTF_8),
+ containsInAnyOrder("content3"));
+ }
+
+ private List<ResourceId> toResourceIds(List<Path> paths, final boolean isDirectory) {
+ return FluentIterable
+ .from(paths)
+ .transform(new Function<Path, ResourceId>() {
+ @Override
+ public ResourceId apply(Path path) {
+ return LocalResourceId.fromPath(path, isDirectory);
+ }})
+ .toList();
+ }
+
+ private void createFileWithContent(Path path, String content) throws Exception {
+ try (Writer writer = Channels.newWriter(
+ localFileSystem.create(
+ LocalResourceId.fromPath(path, false /* isDirectory */),
+ CreateOptions.StandardCreateOptions.builder().setMimeType(MimeTypes.TEXT).build()),
+ StandardCharsets.UTF_8.name())) {
+ writer.write(content);
+ }
+ }
+
private LocalResourceId toLocalResourceId(String str) throws Exception {
boolean isDirectory;
if (SystemUtils.IS_OS_WINDOWS) {
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
index 74f8b72..bb5928e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
@@ -168,28 +168,24 @@ public class LocalFileSystemTest {
@Test
public void testMatchPatternNone() throws Exception {
- List<String> expected = ImmutableList.of();
temporaryFolder.newFile("a");
temporaryFolder.newFile("aa");
temporaryFolder.newFile("ab");
List<MatchResult> matchResults =
matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("b"), "*");
- assertThat(
- toFilenames(matchResults),
- containsInAnyOrder(expected.toArray(new String[expected.size()])));
+ assertEquals(1, matchResults.size());
+ assertEquals(MatchResult.Status.NOT_FOUND, matchResults.get(0).status());
}
@Test
public void testMatchForNonExistentFile() throws Exception {
- List<String> expected = ImmutableList.of();
temporaryFolder.newFile("aa");
List<MatchResult> matchResults = localFileSystem.match(
ImmutableList.of(temporaryFolder.getRoot().toPath().resolve("a").toString()));
- assertThat(
- toFilenames(matchResults),
- containsInAnyOrder(expected.toArray(new String[expected.size()])));
+ assertEquals(1, matchResults.size());
+ assertEquals(MatchResult.Status.NOT_FOUND, matchResults.get(0).status());
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/d07ef530/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
index aecc5c9..a1ac827 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
@@ -97,7 +97,7 @@ public class GcsResourceId implements ResourceId {
@Override
public String toString() {
- return String.format("GcsResourceId: [%s]", gcsPath);
+ return gcsPath.toString();
}
@Override
[2/2] beam git commit: This closes #2175
Posted by pe...@apache.org.
This closes #2175
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/769398e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/769398e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/769398e4
Branch: refs/heads/master
Commit: 769398e403b9632562544ca752ee528acfceec5a
Parents: 5363431 d07ef53
Author: Pei He <pe...@gmail.com>
Authored: Thu Mar 30 19:25:09 2017 +0800
Committer: Pei He <pe...@gmail.com>
Committed: Thu Mar 30 19:25:09 2017 +0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileSystems.java | 302 +++++++++++++++++--
.../org/apache/beam/sdk/io/LocalFileSystem.java | 57 ++--
.../org/apache/beam/sdk/io/LocalResourceId.java | 2 +-
.../org/apache/beam/sdk/io/fs/MoveOptions.java | 34 +++
.../org/apache/beam/sdk/io/fs/ResourceId.java | 8 +
.../java/org/apache/beam/sdk/util/GcsUtil.java | 10 -
.../org/apache/beam/sdk/io/FileSystemsTest.java | 168 ++++++++++-
.../apache/beam/sdk/io/LocalFileSystemTest.java | 12 +-
.../beam/sdk/io/gcp/storage/GcsResourceId.java | 2 +-
9 files changed, 516 insertions(+), 79 deletions(-)
----------------------------------------------------------------------