You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 18:10:58 UTC
[1/2] beam git commit: This closes #2671
Repository: beam
Updated Branches:
refs/heads/master ebd3bffcd -> 9fdefe1ad
This closes #2671
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9fdefe1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9fdefe1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9fdefe1a
Branch: refs/heads/master
Commit: 9fdefe1ade96880817533dd983d03fcd39f90c9d
Parents: ebd3bff 512b2c0
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 10:48:17 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 10:48:17 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/hdfs/HadoopResourceId.java | 47 +++++++++++++-
.../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 66 ++++++++++++++++++++
2 files changed, 111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Add HadoopResourceId
Posted by tg...@apache.org.
Add HadoopResourceId
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/512b2c0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/512b2c0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/512b2c0a
Branch: refs/heads/master
Commit: 512b2c0abb68c777c207dcec39de5c44e6ff34da
Parents: ebd3bff
Author: Stephen Sisk <si...@google.com>
Authored: Fri Apr 21 17:23:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 10:48:17 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/hdfs/HadoopResourceId.java | 47 +++++++++++++-
.../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 66 ++++++++++++++++++++
2 files changed, 111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/512b2c0a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index 2a29bb9..5a66ac9 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,26 +17,69 @@
*/
package org.apache.beam.sdk.io.hdfs;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.fs.Path;
/**
* {@link ResourceId} implementation for the {@link HadoopFileSystem}.
*/
public class HadoopResourceId implements ResourceId {
+ private final Path path;
+
+ /**
+ * Constructs a HadoopResourceId from the provided absolute path. If only a relative path is
+ * available, you can create a {@link HadoopResourceId} from the absolute path of the root of the
+ * server, and then use resolve to add the relative path to the root.
+ */
+ public static HadoopResourceId fromPath(Path path) {
+ checkNotNull(path, "path must not be null");
+ checkArgument(path.isAbsolute(), "path must be absolute");
+ return new HadoopResourceId(path);
+ }
+
+ private HadoopResourceId(Path path) {
+ this.path = path;
+ }
+
@Override
public ResourceId resolve(String other, ResolveOptions resolveOptions) {
- throw new UnsupportedOperationException();
+ checkArgument(
+ resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+ || resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY),
+ String.format("ResolveOptions: [%s] is not supported. "
+ + "Supported ResolveOptions are RESOLVE_FILE and RESOLVE_DIRECTORY.", resolveOptions));
+ if (resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)) {
+ checkArgument(
+ !other.endsWith("/"),
+ "ResolveOptions: [%s] ends with '/', which is not supported for RESOLVE_FILE.",
+ other);
+ }
+ return new HadoopResourceId(new Path(path, other));
}
@Override
public ResourceId getCurrentDirectory() {
+ // See BEAM-2069. Possible workaround: inject FileSystem into this class, and call
+ // org.apache.hadoop.fs.FileSystem#isDirectory.
throw new UnsupportedOperationException();
}
@Override
public String getScheme() {
- throw new UnsupportedOperationException();
+ return path.toUri().getScheme();
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/512b2c0a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
new file mode 100644
index 0000000..724bcd9
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -0,0 +1,66 @@
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Tests for the HadoopResourceId class.
+ */
+public class HadoopResourceIdTest {
+ @Test
+ public void fromAndToPath() {
+ // Directory path without slash
+ Path dirPathWithoutSlash = new Path("hdfs://myhost/mydir");
+ HadoopResourceId resourceDirWithoutSlash = HadoopResourceId.fromPath(dirPathWithoutSlash);
+ assertEquals("hdfs://myhost/mydir",
+ resourceDirWithoutSlash.toString());
+ assertEquals(dirPathWithoutSlash, resourceDirWithoutSlash.getPath());
+
+ // Directory path with slash
+ Path dirPathWithSlash = new Path("hdfs://myhost/mydir/");
+ HadoopResourceId resourceDirWithSlash = HadoopResourceId.fromPath(dirPathWithSlash);
+ assertEquals("hdfs://myhost/mydir",
+ resourceDirWithSlash.toString());
+ assertEquals(dirPathWithSlash, resourceDirWithSlash.getPath());
+
+ // File path
+ Path filePath = new Path("hdfs://myhost/mydir/myfile.txt");
+ HadoopResourceId resourceFile = HadoopResourceId.fromPath(filePath);
+ assertEquals("hdfs://myhost/mydir/myfile.txt",
+ resourceFile.toString());
+ assertEquals(filePath, resourceFile.getPath());
+ }
+
+ @Test
+ public void handlesRelativePathsAddedToDir() {
+ // Directory + file - slash on Directory
+ HadoopResourceId dirWithSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/"));
+ assertEquals("hdfs://myhost/mydir/myfile.txt",
+ dirWithSlash.resolve("myfile.txt",
+ ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+
+ // Directory + Directory
+ assertEquals("hdfs://myhost/mydir/2nddir",
+ dirWithSlash.resolve("2nddir",
+ ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+ assertEquals("hdfs://myhost/mydir/2nddir",
+ dirWithSlash.resolve("2nddir/",
+ ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+
+
+ // Directory + File - no slash on either
+ HadoopResourceId dirWithoutSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir"));
+ assertEquals("hdfs://myhost/mydir/myfile.txt",
+ dirWithoutSlash.resolve("myfile.txt",
+ ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+ }
+
+ @Test
+ public void testScheme() {
+ assertEquals("hdfs",
+ HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/file.txt")).getScheme());
+ }
+}