You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 22:18:53 UTC

[1/2] beam git commit: This closes #2681

Repository: beam
Updated Branches:
  refs/heads/hdfs [created] 162369aa8


This closes #2681


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/162369aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/162369aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/162369aa

Branch: refs/heads/hdfs
Commit: 162369aa8f11961b6c2c5af0c89be8ed08cb4b45
Parents: 4fabaef fac7b83
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 14:51:39 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 14:51:39 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/hdfs/HadoopResourceId.java      | 48 ++++++++++-
 .../beam/sdk/io/hdfs/HadoopResourceIdTest.java  | 83 ++++++++++++++++++++
 2 files changed, 129 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Add HadoopResourceId

Posted by tg...@apache.org.
Add HadoopResourceId


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fac7b838
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fac7b838
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fac7b838

Branch: refs/heads/hdfs
Commit: fac7b838bb2642aeb34d12d359737fd641d52aec
Parents: 4fabaef
Author: Stephen Sisk <si...@google.com>
Authored: Fri Apr 21 17:23:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 14:51:39 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/hdfs/HadoopResourceId.java      | 48 ++++++++++-
 .../beam/sdk/io/hdfs/HadoopResourceIdTest.java  | 83 ++++++++++++++++++++
 2 files changed, 129 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fac7b838/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index 2a29bb9..5524cac 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,26 +17,70 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.URI;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.fs.Path;
 
 /**
  * {@link ResourceId} implementation for the {@link HadoopFileSystem}.
  */
 public class HadoopResourceId implements ResourceId {
 
+  private final URI uri;
+
+  /**
+   * Constructs a HadoopResourceId from the provided absolute path. If only a relative path is
+   * available, you can create a {@link HadoopResourceId} from the absolute path of the root of the
+   * server, and then use resolve to add the relative path to the root.
+   */
+  public static HadoopResourceId fromPath(Path path) {
+    checkNotNull(path, "path must not be null");
+    checkArgument(path.isAbsolute(), "path must be absolute");
+    return new HadoopResourceId(path);
+  }
+
+  private HadoopResourceId(Path path) {
+    this.uri = path.toUri();
+  }
+
   @Override
   public ResourceId resolve(String other, ResolveOptions resolveOptions) {
-    throw new UnsupportedOperationException();
+    checkArgument(
+        resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+            || resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY),
+        String.format("ResolveOptions: [%s] is not supported. "
+            + "Supported ResolveOptions are RESOLVE_FILE and RESOLVE_DIRECTORY.", resolveOptions));
+    if (resolveOptions.equals(ResolveOptions.StandardResolveOptions.RESOLVE_FILE)) {
+      checkArgument(
+          !other.endsWith("/"),
+          "ResolveOptions: [%s] ends with '/', which is not supported for RESOLVE_FILE.",
+          other);
+    }
+    return new HadoopResourceId(new Path(new Path(uri), other));
   }
 
   @Override
   public ResourceId getCurrentDirectory() {
+    // See BEAM-2069. Possible workaround: inject FileSystem into this class, and call
+    // org.apache.hadoop.fs.FileSystem#isDirectory.
     throw new UnsupportedOperationException();
   }
 
   @Override
   public String getScheme() {
-    throw new UnsupportedOperationException();
+    return uri.getScheme();
+  }
+
+  public Path getPath() {
+    return new Path(uri);
+  }
+
+  @Override
+  public String toString() {
+    return uri.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fac7b838/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
new file mode 100644
index 0000000..e4eadfa
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Tests for the HadoopResourceId class.
+ */
+public class HadoopResourceIdTest {
+  @Test
+  public void fromAndToPath() {
+    // Directory path without slash
+    Path dirPathWithoutSlash = new Path("hdfs://myhost/mydir");
+    HadoopResourceId resourceDirWithoutSlash = HadoopResourceId.fromPath(dirPathWithoutSlash);
+    assertEquals("hdfs://myhost/mydir",
+        resourceDirWithoutSlash.toString());
+    assertEquals(dirPathWithoutSlash, resourceDirWithoutSlash.getPath());
+
+    // Directory path with slash
+    Path dirPathWithSlash = new Path("hdfs://myhost/mydir/");
+    HadoopResourceId resourceDirWithSlash = HadoopResourceId.fromPath(dirPathWithSlash);
+    assertEquals("hdfs://myhost/mydir",
+        resourceDirWithSlash.toString());
+    assertEquals(dirPathWithSlash, resourceDirWithSlash.getPath());
+
+    // File path
+    Path filePath = new Path("hdfs://myhost/mydir/myfile.txt");
+    HadoopResourceId resourceFile = HadoopResourceId.fromPath(filePath);
+    assertEquals("hdfs://myhost/mydir/myfile.txt",
+        resourceFile.toString());
+    assertEquals(filePath, resourceFile.getPath());
+  }
+
+  @Test
+  public void handlesRelativePathsAddedToDir() {
+    // Directory + file - slash on Directory
+    HadoopResourceId dirWithSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/"));
+    assertEquals("hdfs://myhost/mydir/myfile.txt",
+        dirWithSlash.resolve("myfile.txt",
+            ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+
+    // Directory + Directory
+    assertEquals("hdfs://myhost/mydir/2nddir",
+        dirWithSlash.resolve("2nddir",
+            ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+    assertEquals("hdfs://myhost/mydir/2nddir",
+        dirWithSlash.resolve("2nddir/",
+            ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).toString());
+
+
+    // Directory + File - no slash on either
+    HadoopResourceId dirWithoutSlash = HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir"));
+    assertEquals("hdfs://myhost/mydir/myfile.txt",
+        dirWithoutSlash.resolve("myfile.txt",
+            ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
+  }
+
+  @Test
+  public void testScheme() {
+    assertEquals("hdfs",
+        HadoopResourceId.fromPath(new Path("hdfs://myhost/mydir/file.txt")).getScheme());
+  }
+}