You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/27 21:56:31 UTC

[1/2] beam git commit: [BEAM-59] Add FileSystems#matchNewResource

Repository: beam
Updated Branches:
  refs/heads/master fdf2de999 -> 14a90d7c5


[BEAM-59] Add FileSystems#matchNewResource

The new FileSystems API needs a way to generate a ResourceId for a
resource that does not exist. This does not come up in sources, because
we typically are just matching existing files. However, sinks need the
ability to reference a new directory (say, in order to create it).

Couldn't think of anything better than a simple function that says
"treat this string as a full resource path with the specified type",
which is what FileSystems#matchNewResource is.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f93a2775
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f93a2775
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f93a2775

Branch: refs/heads/master
Commit: f93a27755921e73141fd97de042e6dcd04e10a47
Parents: fdf2de9
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 25 17:29:02 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 14:51:04 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/FileSystem.java | 12 +++++++
 .../org/apache/beam/sdk/io/FileSystems.java     | 15 +++++++++
 .../org/apache/beam/sdk/io/LocalFileSystem.java |  7 ++++
 .../org/apache/beam/sdk/io/fs/ResourceId.java   | 17 +++++++++-
 .../apache/beam/sdk/io/LocalFileSystemTest.java | 22 ++++++++++++
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  | 35 +++++++++++++++-----
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      |  5 +++
 7 files changed, 104 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 001f596..76c5dc1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -139,4 +139,16 @@ public abstract class FileSystem<ResourceIdT extends ResourceId> {
    * to determine the state of the resources.
    */
   protected abstract void delete(Collection<ResourceIdT> resourceIds) throws IOException;
+
+  /**
+   * Returns a new {@link ResourceId} for this filesystem that represents the named resource.
+   * The user supplies both the resource spec and whether it is a directory.
+   *
+   * <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
+   * any necessary escaping, for this {@link FileSystem}.
+   *
+   * <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
+   * such as when the specified {@code singleResourceSpec} is not a valid resource name.
+   */
+  protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 0b50070..b290498 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -485,4 +485,19 @@ public class FileSystems {
       }
     }
   }
+
+  /**
+   * Returns a new {@link ResourceId} that represents the named resource of a type corresponding
+   * to the resource type.
+   *
+   * <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
+   * any necessary escaping, for the underlying {@link FileSystem}.
+   *
+   * <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
+   * such as when the specified {@code singleResourceSpec} is not a valid resource name.
+   */
+  public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+    return getFileSystemInternal(parseScheme(singleResourceSpec))
+        .matchNewResource(singleResourceSpec, isDirectory);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 8349a35..2d80ae4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -34,6 +34,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
@@ -164,6 +165,12 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
     }
   }
 
+  @Override
+  protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+    Path path = Paths.get(singleResourceSpec);
+    return LocalResourceId.fromPath(path, isDirectory);
+  }
+
   private MatchResult matchOne(String spec) throws IOException {
     File file = Paths.get(spec).toFile();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index b7859ca..26a21bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.fs;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 
 /**
@@ -27,7 +28,21 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
  * <p>{@link ResourceId} is hierarchical and composed of a sequence of directory
  * and file name elements separated by a special separator or delimiter.
  *
- * <p>TODO: add examples for how ResourceId is constructed and used.
+ * <p>{@link ResourceId ResourceIds} are created using {@link FileSystems}. The two primary
+ * mechanisms are:
+ *
+ * <ul>
+ *   <li>{@link FileSystems#match(java.util.List)}, which takes a list of {@link String} resource
+ *   names or globs, queries the {@link FileSystem} for resources matching these specifications,
+ *   and returns a {@link MatchResult} for each glob. This is typically used when reading from
+ *   files.
+ *
+ *   <li>{@link FileSystems#matchNewResource(String, boolean)}, which takes a {@link String} full
+ *   resource name and type (file or directory) and generates a {@link FileSystem}-specific
+ *   {@code ResourceId} for that resource. This call does not verify the presence or absence of that
+ *   resource in the file system. This call is typically used when creating new directories or files
+ *   to generate {@link ResourceId ResourceIds} for resources that may not yet exist.
+ * </ul>
  */
 public interface ResourceId extends Serializable {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
index d335974..ac4fe61 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
@@ -18,8 +18,10 @@
 package org.apache.beam.sdk.io;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -301,6 +303,26 @@ public class LocalFileSystemTest {
         toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty());
   }
 
+  @Test
+  public void testMatchNewResource() throws Exception {
+    LocalResourceId fileResource =
+        localFileSystem
+            .matchNewResource("/some/test/resource/path", false /* isDirectory */);
+    LocalResourceId dirResource =
+        localFileSystem
+            .matchNewResource("/some/test/resource/path", true /* isDirectory */);
+    assertNotEquals(fileResource, dirResource);
+    assertThat(
+        fileResource.getCurrentDirectory().resolve(
+            "path", StandardResolveOptions.RESOLVE_DIRECTORY),
+        equalTo(dirResource.getCurrentDirectory()));
+    assertThat(
+        fileResource.getCurrentDirectory().resolve(
+            "path", StandardResolveOptions.RESOLVE_DIRECTORY),
+        equalTo(dirResource.getCurrentDirectory()));
+    assertThat(dirResource.toString(), equalTo("/some/test/resource/path/"));
+  }
+
   private void createFileWithContent(Path path, String content) throws Exception {
     try (Writer writer = Channels.newWriter(
         localFileSystem.create(

http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
index 2ae6b7e..1b0bd9d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.storage;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -37,6 +38,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.fs.CreateOptions;
@@ -69,8 +71,7 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
     List<GcsPath> nonGlobs = Lists.newArrayList();
     List<Boolean> isGlobBooleans = Lists.newArrayList();
 
-    for (int i = 0; i < gcsPaths.size(); ++i) {
-      GcsPath path = gcsPaths.get(i);
+    for (GcsPath path : gcsPaths) {
       if (GcsUtil.isGlob(path)) {
         globs.add(path);
         isGlobBooleans.add(true);
@@ -123,6 +124,22 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
   }
 
   @Override
+  protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+    if (isDirectory) {
+      if (!singleResourceSpec.endsWith("/")) {
+        singleResourceSpec += '/';
+      }
+    } else {
+      checkArgument(
+          !singleResourceSpec.endsWith("/"),
+          "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
+          singleResourceSpec);
+    }
+    GcsPath path = GcsPath.fromUri(singleResourceSpec);
+    return GcsResourceId.fromGcsPath(path);
+  }
+
+  @Override
   protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
       throws IOException {
     options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
@@ -196,13 +213,15 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
   }
 
   private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
-    if (objectOrException.ioException() instanceof FileNotFoundException) {
-      return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException());
-    } else if (objectOrException.ioException() != null) {
-      return MatchResult.create(Status.ERROR, objectOrException.ioException());
+    @Nullable IOException exception = objectOrException.ioException();
+    if (exception instanceof FileNotFoundException) {
+      return MatchResult.create(Status.NOT_FOUND, exception);
+    } else if (exception != null) {
+      return MatchResult.create(Status.ERROR, exception);
     } else {
-      return MatchResult.create(
-          Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())});
+      StorageObject object = objectOrException.storageObject();
+      assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
+      return MatchResult.create(Status.OK, new Metadata[]{toMetadata(object)});
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index f4e35ac..ca56a60 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -68,4 +68,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
   protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+    throw new UnsupportedOperationException();
+  }
 }


[2/2] beam git commit: This closes #2728

Posted by dh...@apache.org.
This closes #2728


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14a90d7c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14a90d7c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14a90d7c

Branch: refs/heads/master
Commit: 14a90d7c5196d96de66e897687d28202d790439f
Parents: fdf2de9 f93a277
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 27 14:56:24 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 14:56:24 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/FileSystem.java | 12 +++++++
 .../org/apache/beam/sdk/io/FileSystems.java     | 15 +++++++++
 .../org/apache/beam/sdk/io/LocalFileSystem.java |  7 ++++
 .../org/apache/beam/sdk/io/fs/ResourceId.java   | 17 +++++++++-
 .../apache/beam/sdk/io/LocalFileSystemTest.java | 22 ++++++++++++
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  | 35 +++++++++++++++-----
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      |  5 +++
 7 files changed, 104 insertions(+), 9 deletions(-)
----------------------------------------------------------------------