You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 22:18:53 UTC
[1/2] beam git commit: This closes #2681
Repository: beam
Updated Branches:
refs/heads/hdfs [created] 162369aa8
This closes #2681
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/162369aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/162369aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/162369aa
Branch: refs/heads/hdfs
Commit: 162369aa8f11961b6c2c5af0c89be8ed08cb4b45
Parents: 4fabaef fac7b83
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 14:51:39 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 14:51:39 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/hdfs/HadoopResourceId.java | 48 ++++++++++-
.../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 83 ++++++++++++++++++++
2 files changed, 129 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Add HadoopResourceId
Posted by tg...@apache.org.
Add HadoopResourceId
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fac7b838
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fac7b838
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fac7b838
Branch: refs/heads/hdfs
Commit: fac7b838bb2642aeb34d12d359737fd641d52aec
Parents: 4fabaef
Author: Stephen Sisk <si...@google.com>
Authored: Fri Apr 21 17:23:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 14:51:39 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/hdfs/HadoopResourceId.java | 48 ++++++++++-
.../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 83 ++++++++++++++++++++
2 files changed, 129 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fac7b838/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index 2a29bb9..5524cac 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,26 +17,70 @@
*/
package org.apache.beam.sdk.io.hdfs;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.URI;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.fs.Path;
/**
* {@link ResourceId} implementation for the {@link HadoopFileSystem}.
*/
public class HadoopResourceId implements ResourceId {
+ private final URI uri;
+
+ /**
+ * Constructs a HadoopResourceId from the provided absolute path. If only a relative path is
+ * available, you can create a {@link HadoopResourceId} from the absolute path of the root of the
+ * server, and then use resolve to add the relative path to the root.
+ */
+ public static HadoopResourceId fromPath(Path path) {
+ checkNotNull(path, "path must not be null");
+ checkArgument(path.isAbsolute(), "path must be absolute");
+ return new HadoopResourceId(path);
+ }
+
+ private HadoopResourceId(Path path) {
+ this.uri = path.toUri();
+ }
+
@Override
public ResourceId resolve(String other, ResolveOptions resolveOptions) {
- throw new UnsupportedOperationException();
+ checkArgument(
+ resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+ || resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY),
+ String.format("ResolveOptions: [%s] is not supported. "
+ + "Supported ResolveOptions are RESOLVE_FILE and RESOLVE_DIRECTORY.", resolveOptions));
+ if (resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)) {
+ checkArgument(
+ !other.endsWith("/"),
+ "ResolveOptions: [%s] ends with '/', which is not supported for RESOLVE_FILE.",
+ other);
+ }
+ return new HadoopResourceId(new Path(new Path(uri), other));
}
@Override
public ResourceId getCurrentDirectory() {
+ // See BEAM-2069. Possible workaround: inject FileSystem into this class, and call
+ // org.apache.hadoop.fs.FileSystem#isDirectory.
throw new UnsupportedOperationException();
}
@Override
public String getScheme() {
- throw new UnsupportedOperationException();
+ return uri.getScheme();
+ }
+
+ public Path getPath() {
+ return new Path(uri);
+ }
+
+ @Override
+ public String toString() {
+ return uri.toString();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fac7b838/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
new file mode 100644
index 0000000..e4eadfa
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Tests for the HadoopResourceId class.
+ */
+public class HadoopResourceIdTest {
+ @Test
+ public void fromAndToPath() {
+ // Directory path without slash
+ Path dirPathWithoutSlash = new Path("hdfs://myhost/mydir");
+ HadoopResourceId resourceDirWithoutSlash = HadoopResourceId.fromPath(dirPathWithoutSlash);
+ assertEquals("hdfs://myhost/mydir",
+ resourceDirWithoutSlash.toString());
+ assertEquals(dirPathWithoutSlash, resourceDirWithoutSlash.getPath());
+
+ // Directory path with slash
+ Path dirPathWithSlash = new Path("hdfs://myhost/mydir/");
+ HadoopResourceId resourceDirWithSlash = HadoopResourceId.fromPath(dirPathWithSlash);
+ assertEquals("hdfs://myhost/mydir",
+ resourceDirWithSlash.toString());
+ assertEquals(dirPathWithSlash, resourceDirWithSlash.getPath());
+
+ // File path
+ Path filePath = new Path("hdfs://myhost/mydir/myfile.txt");
+ HadoopResourceId resourceFile = HadoopResourceId.fromPath(filePath);
+ assertEquals("hdfs://myhost/mydir/myfile.txt",
+ resourceFile.toString());
+ assertEquals(filePath, resourceFile.getPath());
+ }
+
+ @Test
+ public void handlesRelativePathsAddedToDir() {
+ // Directory + file - slash on Directory
+ HadoopResourceId dirWithSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/"));
+ assertEquals("hdfs://myhost/mydir/myfile.txt",
+ dirWithSlash.resolve("myfile.txt",
+ ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+
+ // Directory + Directory
+ assertEquals("hdfs://myhost/mydir/2nddir",
+ dirWithSlash.resolve("2nddir",
+ ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+ assertEquals("hdfs://myhost/mydir/2nddir",
+ dirWithSlash.resolve("2nddir/",
+ ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+
+
+ // Directory + File - no slash on either
+ HadoopResourceId dirWithoutSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir"));
+ assertEquals("hdfs://myhost/mydir/myfile.txt",
+ dirWithoutSlash.resolve("myfile.txt",
+ ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+ }
+
+ @Test
+ public void testScheme() {
+ assertEquals("hdfs",
+ HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/file.txt")).getScheme());
+ }
+}