You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 18:10:58 UTC

[1/2] beam git commit: This closes #2671

Repository: beam
Updated Branches:
  refs/heads/master ebd3bffcd -> 9fdefe1ad


This closes #2671


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9fdefe1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9fdefe1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9fdefe1a

Branch: refs/heads/master
Commit: 9fdefe1ade96880817533dd983d03fcd39f90c9d
Parents: ebd3bff 512b2c0
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 10:48:17 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 10:48:17 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/hdfs/HadoopResourceId.java      | 47 +++++++++++++-
 .../beam/sdk/io/hdfs/HadoopResourceIdTest.java  | 66 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Add HadoopResourceId

Posted by tg...@apache.org.
Add HadoopResourceId


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/512b2c0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/512b2c0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/512b2c0a

Branch: refs/heads/master
Commit: 512b2c0abb68c777c207dcec39de5c44e6ff34da
Parents: ebd3bff
Author: Stephen Sisk <si...@google.com>
Authored: Fri Apr 21 17:23:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 10:48:17 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/hdfs/HadoopResourceId.java      | 47 +++++++++++++-
 .../beam/sdk/io/hdfs/HadoopResourceIdTest.java  | 66 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/512b2c0a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index 2a29bb9..5a66ac9 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,26 +17,69 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.fs.Path;
 
 /**
  * {@link ResourceId} implementation for the {@link HadoopFileSystem}.
  */
 public class HadoopResourceId implements ResourceId {
 
+  private final Path path;
+
+  /**
+   * Constructs a HadoopResourceId from the provided absolute path. If only a relative path is
+   * available, you can create a {@link HadoopResourceId} from the absolute path of the root of the
+   * server, and then use resolve to add the relative path to the root.
+   */
+  public static HadoopResourceId fromPath(Path path) {
+    checkNotNull(path, "path must not be null");
+    checkArgument(path.isAbsolute(), "path must be absolute");
+    return new HadoopResourceId(path);
+  }
+
+  private HadoopResourceId(Path path) {
+    this.path = path;
+  }
+
   @Override
   public ResourceId resolve(String other, ResolveOptions resolveOptions) {
-    throw new UnsupportedOperationException();
+    checkArgument(
+        resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+            || resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY),
+        String.format("ResolveOptions: [%s] is not supported. "
+            + "Supported ResolveOptions are RESOLVE_FILE and RESOLVE_DIRECTORY.", resolveOptions));
+    if (resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)) {
+      checkArgument(
+          !other.endsWith("/"),
+          "ResolveOptions: [%s] ends with '/', which is not supported for RESOLVE_FILE.",
+          other);
+    }
+    return new HadoopResourceId(new Path(path, other));
   }
 
   @Override
   public ResourceId getCurrentDirectory() {
+    // See BEAM-2069. Possible workaround: inject FileSystem into this class, and call
+    // org.apache.hadoop.fs.FileSystem#isDirectory.
     throw new UnsupportedOperationException();
   }
 
   @Override
   public String getScheme() {
-    throw new UnsupportedOperationException();
+    return path.toUri().getScheme();
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public String toString() {
+    return path.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/512b2c0a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
new file mode 100644
index 0000000..724bcd9
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -0,0 +1,66 @@
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Tests for the HadoopResourceId class.
+ */
+public class HadoopResourceIdTest {
+  @Test
+  public void fromAndToPath() {
+    // Directory path without slash
+    Path dirPathWithoutSlash = new Path("hdfs://myhost/mydir");
+    HadoopResourceId resourceDirWithoutSlash = HadoopResourceId.fromPath(dirPathWithoutSlash);
+    assertEquals("hdfs://myhost/mydir",
+        resourceDirWithoutSlash.toString());
+    assertEquals(dirPathWithoutSlash, resourceDirWithoutSlash.getPath());
+
+    // Directory path with slash
+    Path dirPathWithSlash = new Path("hdfs://myhost/mydir/");
+    HadoopResourceId resourceDirWithSlash = HadoopResourceId.fromPath(dirPathWithSlash);
+    assertEquals("hdfs://myhost/mydir",
+        resourceDirWithSlash.toString());
+    assertEquals(dirPathWithSlash, resourceDirWithSlash.getPath());
+
+    // File path
+    Path filePath = new Path("hdfs://myhost/mydir/myfile.txt");
+    HadoopResourceId resourceFile = HadoopResourceId.fromPath(filePath);
+    assertEquals("hdfs://myhost/mydir/myfile.txt",
+        resourceFile.toString());
+    assertEquals(filePath, resourceFile.getPath());
+  }
+
+  @Test
+  public void handlesRelativePathsAddedToDir() {
+    // Directory + file - slash on Directory
+    HadoopResourceId dirWithSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/"));
+    assertEquals("hdfs://myhost/mydir/myfile.txt",
+        dirWithSlash.resolve("myfile.txt",
+            ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+
+    // Directory + Directory
+    assertEquals("hdfs://myhost/mydir/2nddir",
+        dirWithSlash.resolve("2nddir",
+            ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+    assertEquals("hdfs://myhost/mydir/2nddir",
+        dirWithSlash.resolve("2nddir/",
+            ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+
+
+    // Directory + File - no slash on either
+    HadoopResourceId dirWithoutSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir"));
+    assertEquals("hdfs://myhost/mydir/myfile.txt",
+        dirWithoutSlash.resolve("myfile.txt",
+            ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+  }
+
+  @Test
+  public void testScheme() {
+    assertEquals("hdfs",
+        HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/file.txt")).getScheme());
+  }
+}