You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/27 21:56:31 UTC
[1/2] beam git commit: [BEAM-59] Add FileSystems#matchNewResource
Repository: beam
Updated Branches:
refs/heads/master fdf2de999 -> 14a90d7c5
[BEAM-59] Add FileSystems#matchNewResource
The new FileSystems API needs a way to generate a ResourceId for a
resource that does not exist. This does not come up in sources, because
we typically are just matching existing files. However, sinks need the
ability to reference a new directory (say, in order to create it).
Couldn't think of anything better than a simple function that says
"treat this string as a full resource path with the specified type",
which is what FileSystems#matchNewResource is.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f93a2775
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f93a2775
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f93a2775
Branch: refs/heads/master
Commit: f93a27755921e73141fd97de042e6dcd04e10a47
Parents: fdf2de9
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 25 17:29:02 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 14:51:04 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/FileSystem.java | 12 +++++++
.../org/apache/beam/sdk/io/FileSystems.java | 15 +++++++++
.../org/apache/beam/sdk/io/LocalFileSystem.java | 7 ++++
.../org/apache/beam/sdk/io/fs/ResourceId.java | 17 +++++++++-
.../apache/beam/sdk/io/LocalFileSystemTest.java | 22 ++++++++++++
.../beam/sdk/io/gcp/storage/GcsFileSystem.java | 35 +++++++++++++++-----
.../beam/sdk/io/hdfs/HadoopFileSystem.java | 5 +++
7 files changed, 104 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 001f596..76c5dc1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -139,4 +139,16 @@ public abstract class FileSystem<ResourceIdT extends ResourceId> {
* to determine the state of the resources.
*/
protected abstract void delete(Collection<ResourceIdT> resourceIds) throws IOException;
+
+ /**
+ * Returns a new {@link ResourceId} for this filesystem that represents the named resource.
+ * The user supplies both the resource spec and whether it is a directory.
+ *
+ * <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
+ * any necessary escaping, for this {@link FileSystem}.
+ *
+ * <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
+ * such as when the specified {@code singleResourceSpec} is not a valid resource name.
+ */
+ protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 0b50070..b290498 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -485,4 +485,19 @@ public class FileSystems {
}
}
}
+
+ /**
+ * Returns a new {@link ResourceId} that represents the named resource of a type corresponding
+ * to the resource type.
+ *
+ * <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
+ * any necessary escaping, for the underlying {@link FileSystem}.
+ *
+ * <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
+ * such as when the specified {@code singleResourceSpec} is not a valid resource name.
+ */
+ public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+ return getFileSystemInternal(parseScheme(singleResourceSpec))
+ .matchNewResource(singleResourceSpec, isDirectory);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 8349a35..2d80ae4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -34,6 +34,7 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
@@ -164,6 +165,12 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
}
}
+ @Override
+ protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+ Path path = Paths.get(singleResourceSpec);
+ return LocalResourceId.fromPath(path, isDirectory);
+ }
+
private MatchResult matchOne(String spec) throws IOException {
File file = Paths.get(spec).toFile();
http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index b7859ca..26a21bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.fs;
import java.io.Serializable;
import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
/**
@@ -27,7 +28,21 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
* <p>{@link ResourceId} is hierarchical and composed of a sequence of directory
* and file name elements separated by a special separator or delimiter.
*
- * <p>TODO: add examples for how ResourceId is constructed and used.
+ * <p>{@link ResourceId ResourceIds} are created using {@link FileSystems}. The two primary
+ * mechanisms are:
+ *
+ * <ul>
+ * <li>{@link FileSystems#match(java.util.List)}, which takes a list of {@link String} resource
+ * names or globs, queries the {@link FileSystem} for resources matching these specifications,
+ * and returns a {@link MatchResult} for each glob. This is typically used when reading from
+ * files.
+ *
+ * <li>{@link FileSystems#matchNewResource(String, boolean)}, which takes a {@link String} full
+ * resource name and type (file or directory) and generates a {@link FileSystem}-specific
+ * {@code ResourceId} for that resource. This call does not verify the presence or absence of that
+ * resource in the file system. This call is typically used when creating new directories or files
+ * to generate {@link ResourceId ResourceIds} for resources that may not yet exist.
+ * </ul>
*/
public interface ResourceId extends Serializable {
http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
index d335974..ac4fe61 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
@@ -18,8 +18,10 @@
package org.apache.beam.sdk.io;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -301,6 +303,26 @@ public class LocalFileSystemTest {
toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty());
}
+ @Test
+ public void testMatchNewResource() throws Exception {
+ LocalResourceId fileResource =
+ localFileSystem
+ .matchNewResource("/some/test/resource/path", false /* isDirectory */);
+ LocalResourceId dirResource =
+ localFileSystem
+ .matchNewResource("/some/test/resource/path", true /* isDirectory */);
+ assertNotEquals(fileResource, dirResource);
+ assertThat(
+ fileResource.getCurrentDirectory().resolve(
+ "path", StandardResolveOptions.RESOLVE_DIRECTORY),
+ equalTo(dirResource.getCurrentDirectory()));
+ assertThat(
+ fileResource.getCurrentDirectory().resolve(
+ "path", StandardResolveOptions.RESOLVE_DIRECTORY),
+ equalTo(dirResource.getCurrentDirectory()));
+ assertThat(dirResource.toString(), equalTo("/some/test/resource/path/"));
+ }
+
private void createFileWithContent(Path path, String content) throws Exception {
try (Writer writer = Channels.newWriter(
localFileSystem.create(
http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
index 2ae6b7e..1b0bd9d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.storage;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -37,6 +38,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.CreateOptions;
@@ -69,8 +71,7 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
List<GcsPath> nonGlobs = Lists.newArrayList();
List<Boolean> isGlobBooleans = Lists.newArrayList();
- for (int i = 0; i < gcsPaths.size(); ++i) {
- GcsPath path = gcsPaths.get(i);
+ for (GcsPath path : gcsPaths) {
if (GcsUtil.isGlob(path)) {
globs.add(path);
isGlobBooleans.add(true);
@@ -123,6 +124,22 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
}
@Override
+ protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+ if (isDirectory) {
+ if (!singleResourceSpec.endsWith("/")) {
+ singleResourceSpec += '/';
+ }
+ } else {
+ checkArgument(
+ !singleResourceSpec.endsWith("/"),
+ "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
+ singleResourceSpec);
+ }
+ GcsPath path = GcsPath.fromUri(singleResourceSpec);
+ return GcsResourceId.fromGcsPath(path);
+ }
+
+ @Override
protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
throws IOException {
options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
@@ -196,13 +213,15 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
}
private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
- if (objectOrException.ioException() instanceof FileNotFoundException) {
- return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException());
- } else if (objectOrException.ioException() != null) {
- return MatchResult.create(Status.ERROR, objectOrException.ioException());
+ @Nullable IOException exception = objectOrException.ioException();
+ if (exception instanceof FileNotFoundException) {
+ return MatchResult.create(Status.NOT_FOUND, exception);
+ } else if (exception != null) {
+ return MatchResult.create(Status.ERROR, exception);
} else {
- return MatchResult.create(
- Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())});
+ StorageObject object = objectOrException.storageObject();
+ assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
+ return MatchResult.create(Status.OK, new Metadata[]{toMetadata(object)});
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index f4e35ac..ca56a60 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -68,4 +68,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+ throw new UnsupportedOperationException();
+ }
}
[2/2] beam git commit: This closes #2728
Posted by dh...@apache.org.
This closes #2728
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14a90d7c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14a90d7c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14a90d7c
Branch: refs/heads/master
Commit: 14a90d7c5196d96de66e897687d28202d790439f
Parents: fdf2de9 f93a277
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 27 14:56:24 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 14:56:24 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/FileSystem.java | 12 +++++++
.../org/apache/beam/sdk/io/FileSystems.java | 15 +++++++++
.../org/apache/beam/sdk/io/LocalFileSystem.java | 7 ++++
.../org/apache/beam/sdk/io/fs/ResourceId.java | 17 +++++++++-
.../apache/beam/sdk/io/LocalFileSystemTest.java | 22 ++++++++++++
.../beam/sdk/io/gcp/storage/GcsFileSystem.java | 35 +++++++++++++++-----
.../beam/sdk/io/hdfs/HadoopFileSystem.java | 5 +++
7 files changed, 104 insertions(+), 9 deletions(-)
----------------------------------------------------------------------