You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:34:47 UTC

[04/50] [abbrv] beam git commit: [BEAM-2277] HadoopFileSystem: normalize implementation

[BEAM-2277] HadoopFileSystem: normalize implementation

* Drop empty authority always
* Resolve directories


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15df211c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15df211c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15df211c

Branch: refs/heads/jstorm-runner
Commit: 15df211c758e7c8f05c3136f25bbe18e3f394321
Parents: ec956c8
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 12 11:11:06 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 12 14:59:10 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/fs/ResourceIdTester.java | 15 +++++----
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      | 32 +++++++++++---------
 .../beam/sdk/io/hdfs/HadoopResourceId.java      | 16 +++++++++-
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java  |  3 +-
 .../beam/sdk/io/hdfs/HadoopResourceIdTest.java  | 22 +++++++++-----
 5 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
index fe50ada..8ceaeed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
@@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY;
 import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertFalse;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.testing.EqualsTester;
@@ -66,16 +65,16 @@ public final class ResourceIdTester {
     ResourceId file2a = baseDirectory.resolve("child2", RESOLVE_FILE);
     allResourceIds.add(file1);
     allResourceIds.add(file2);
-    assertFalse("Resolved file isDirectory()", file1.isDirectory());
-    assertFalse("Resolved file isDirectory()", file2.isDirectory());
-    assertFalse("Resolved file isDirectory()", file2a.isDirectory());
+    assertThat("Resolved file isDirectory()", file1.isDirectory(), is(false));
+    assertThat("Resolved file isDirectory()", file2.isDirectory(), is(false));
+    assertThat("Resolved file isDirectory()", file2a.isDirectory(), is(false));
 
     ResourceId dir1 = baseDirectory.resolve("child1", RESOLVE_DIRECTORY);
     ResourceId dir2 = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
     ResourceId dir2a = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
-    assertTrue("Resolved directory isDirectory()", dir1.isDirectory());
-    assertTrue("Resolved directory isDirectory()", dir2.isDirectory());
-    assertTrue("Resolved directory isDirectory()", dir2a.isDirectory());
+    assertThat("Resolved directory isDirectory()", dir1.isDirectory(), is(true));
+    assertThat("Resolved directory isDirectory()", dir2.isDirectory(), is(true));
+    assertThat("Resolved directory isDirectory()", dir2a.isDirectory(), is(true));
     allResourceIds.add(dir1);
     allResourceIds.add(dir2);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 154a818..d519a8c 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
@@ -82,8 +81,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
         List<Metadata> metadata = new ArrayList<>();
         for (FileStatus fileStatus : fileStatuses) {
           if (fileStatus.isFile()) {
+            URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
             metadata.add(Metadata.builder()
-                .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
+                .setResourceId(new HadoopResourceId(uri))
                 .setIsReadSeekEfficient(true)
                 .setSizeBytes(fileStatus.getLen())
                 .build());
@@ -151,19 +151,13 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
 
   @Override
   protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
-    try {
-      if (singleResourceSpec.endsWith("/") && !isDirectory) {
-        throw new IllegalArgumentException(String.format(
-            "Expected file path but received directory path %s", singleResourceSpec));
-      }
-      return !singleResourceSpec.endsWith("/") && isDirectory
-          ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
-          : new HadoopResourceId(new URI(singleResourceSpec));
-    } catch (URISyntaxException e) {
-      throw new IllegalArgumentException(
-          String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
-          e);
+    if (singleResourceSpec.endsWith("/") && !isDirectory) {
+      throw new IllegalArgumentException(String.format(
+          "Expected file path but received directory path %s", singleResourceSpec));
     }
+    return !singleResourceSpec.endsWith("/") && isDirectory
+        ? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/"))
+        : new HadoopResourceId(dropEmptyAuthority(singleResourceSpec));
   }
 
   @Override
@@ -237,4 +231,14 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
       inputStream.close();
     }
   }
+
+  private static URI dropEmptyAuthority(String uriStr) {
+    URI uri = URI.create(uriStr);
+    String prefix = uri.getScheme() + ":///";
+    if (uriStr.startsWith(prefix)) {
+      return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length()));
+    } else {
+      return uri;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index e570864..88fa32a 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.net.URI;
 import java.util.Objects;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.hadoop.fs.Path;
 
@@ -35,7 +38,18 @@ class HadoopResourceId implements ResourceId {
 
   @Override
   public ResourceId resolve(String other, ResolveOptions resolveOptions) {
-    return new HadoopResourceId(uri.resolve(other));
+    if (resolveOptions == StandardResolveOptions.RESOLVE_DIRECTORY) {
+      if (!other.endsWith("/")) {
+        other += '/';
+      }
+      return new HadoopResourceId(uri.resolve(other));
+    } else if (resolveOptions == StandardResolveOptions.RESOLVE_FILE) {
+      checkArgument(!other.endsWith("/"), "Resolving a file with a directory path: %s", other);
+      return new HadoopResourceId(uri.resolve(other));
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("Unexpected StandardResolveOptions %s", resolveOptions));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index cf86c36..14591d8 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -63,14 +63,13 @@ public class HadoopFileSystemTest {
   @Rule public TestPipeline p = TestPipeline.create();
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule public ExpectedException thrown = ExpectedException.none();
-  private Configuration configuration;
   private MiniDFSCluster hdfsCluster;
   private URI hdfsClusterBaseUri;
   private HadoopFileSystem fileSystem;
 
   @Before
   public void setUp() throws Exception {
-    configuration = new Configuration();
+    Configuration configuration = new Configuration();
     configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
     hdfsCluster = builder.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
index b0d821b..f179132 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -18,9 +18,11 @@
 package org.apache.beam.sdk.io.hdfs;
 
 import java.net.URI;
+import java.util.Collections;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.fs.ResourceIdTester;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
@@ -33,21 +35,25 @@ import org.junit.rules.TemporaryFolder;
  * Tests for {@link HadoopResourceId}.
  */
 public class HadoopResourceIdTest {
-  private Configuration configuration;
+
   private MiniDFSCluster hdfsCluster;
   private URI hdfsClusterBaseUri;
-  private HadoopFileSystem fileSystem;
+
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   @Before
   public void setUp() throws Exception {
-    configuration = new Configuration();
+    Configuration configuration = new Configuration();
     configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
     hdfsCluster = builder.build();
     hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
-    fileSystem = new HadoopFileSystem(configuration);
+
+    // Register HadoopFileSystem for this test.
+    HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
+    options.setHdfsConfiguration(Collections.singletonList(configuration));
+    FileSystems.setDefaultConfigInWorkers(options);
   }
 
   @After
@@ -57,7 +63,9 @@ public class HadoopResourceIdTest {
 
   @Test
   public void testResourceIdTester() throws Exception {
-    FileSystems.setDefaultConfigInWorkers(TestPipeline.testingPipelineOptions());
-    ResourceIdTester.runResourceIdBattery(new HadoopResourceId(hdfsClusterBaseUri));
+    ResourceId baseDirectory =
+        FileSystems.matchNewResource(
+            "hdfs://" + hdfsClusterBaseUri.getPath(), true /* isDirectory */);
+    ResourceIdTester.runResourceIdBattery(baseDirectory);
   }
 }