You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:34:47 UTC
[04/50] [abbrv] beam git commit: [BEAM-2277] HadoopFileSystem:
normalize implementation
[BEAM-2277] HadoopFileSystem: normalize implementation
* Drop empty authority always
* Resolve directories
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15df211c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15df211c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15df211c
Branch: refs/heads/jstorm-runner
Commit: 15df211c758e7c8f05c3136f25bbe18e3f394321
Parents: ec956c8
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 12 11:11:06 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 12 14:59:10 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/fs/ResourceIdTester.java | 15 +++++----
.../beam/sdk/io/hdfs/HadoopFileSystem.java | 32 +++++++++++---------
.../beam/sdk/io/hdfs/HadoopResourceId.java | 16 +++++++++-
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 3 +-
.../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 22 +++++++++-----
5 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
index fe50ada..8ceaeed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
@@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY;
import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertFalse;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.testing.EqualsTester;
@@ -66,16 +65,16 @@ public final class ResourceIdTester {
ResourceId file2a = baseDirectory.resolve("child2", RESOLVE_FILE);
allResourceIds.add(file1);
allResourceIds.add(file2);
- assertFalse("Resolved file isDirectory()", file1.isDirectory());
- assertFalse("Resolved file isDirectory()", file2.isDirectory());
- assertFalse("Resolved file isDirectory()", file2a.isDirectory());
+ assertThat("Resolved file isDirectory()", file1.isDirectory(), is(false));
+ assertThat("Resolved file isDirectory()", file2.isDirectory(), is(false));
+ assertThat("Resolved file isDirectory()", file2a.isDirectory(), is(false));
ResourceId dir1 = baseDirectory.resolve("child1", RESOLVE_DIRECTORY);
ResourceId dir2 = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
ResourceId dir2a = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
- assertTrue("Resolved directory isDirectory()", dir1.isDirectory());
- assertTrue("Resolved directory isDirectory()", dir2.isDirectory());
- assertTrue("Resolved directory isDirectory()", dir2a.isDirectory());
+ assertThat("Resolved directory isDirectory()", dir1.isDirectory(), is(true));
+ assertThat("Resolved directory isDirectory()", dir2.isDirectory(), is(true));
+ assertThat("Resolved directory isDirectory()", dir2a.isDirectory(), is(true));
allResourceIds.add(dir1);
allResourceIds.add(dir2);
http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 154a818..d519a8c 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@@ -82,8 +81,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
List<Metadata> metadata = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
+ URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
metadata.add(Metadata.builder()
- .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
+ .setResourceId(new HadoopResourceId(uri))
.setIsReadSeekEfficient(true)
.setSizeBytes(fileStatus.getLen())
.build());
@@ -151,19 +151,13 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
@Override
protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
- try {
- if (singleResourceSpec.endsWith("/") && !isDirectory) {
- throw new IllegalArgumentException(String.format(
- "Expected file path but received directory path %s", singleResourceSpec));
- }
- return !singleResourceSpec.endsWith("/") && isDirectory
- ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
- : new HadoopResourceId(new URI(singleResourceSpec));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(
- String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
- e);
+ if (singleResourceSpec.endsWith("/") && !isDirectory) {
+ throw new IllegalArgumentException(String.format(
+ "Expected file path but received directory path %s", singleResourceSpec));
}
+ return !singleResourceSpec.endsWith("/") && isDirectory
+ ? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/"))
+ : new HadoopResourceId(dropEmptyAuthority(singleResourceSpec));
}
@Override
@@ -237,4 +231,14 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
inputStream.close();
}
}
+
+ private static URI dropEmptyAuthority(String uriStr) {
+ URI uri = URI.create(uriStr);
+ String prefix = uri.getScheme() + ":///";
+ if (uriStr.startsWith(prefix)) {
+ return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length()));
+ } else {
+ return uri;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index e570864..88fa32a 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.io.hdfs;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.net.URI;
import java.util.Objects;
import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.hadoop.fs.Path;
@@ -35,7 +38,18 @@ class HadoopResourceId implements ResourceId {
@Override
public ResourceId resolve(String other, ResolveOptions resolveOptions) {
- return new HadoopResourceId(uri.resolve(other));
+ if (resolveOptions == StandardResolveOptions.RESOLVE_DIRECTORY) {
+ if (!other.endsWith("/")) {
+ other += '/';
+ }
+ return new HadoopResourceId(uri.resolve(other));
+ } else if (resolveOptions == StandardResolveOptions.RESOLVE_FILE) {
+ checkArgument(!other.endsWith("/"), "Resolving a file with a directory path: %s", other);
+ return new HadoopResourceId(uri.resolve(other));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unexpected StandardResolveOptions %s", resolveOptions));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index cf86c36..14591d8 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -63,14 +63,13 @@ public class HadoopFileSystemTest {
@Rule public TestPipeline p = TestPipeline.create();
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public ExpectedException thrown = ExpectedException.none();
- private Configuration configuration;
private MiniDFSCluster hdfsCluster;
private URI hdfsClusterBaseUri;
private HadoopFileSystem fileSystem;
@Before
public void setUp() throws Exception {
- configuration = new Configuration();
+ Configuration configuration = new Configuration();
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
hdfsCluster = builder.build();
http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
index b0d821b..f179132 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -18,9 +18,11 @@
package org.apache.beam.sdk.io.hdfs;
import java.net.URI;
+import java.util.Collections;
import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdTester;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@@ -33,21 +35,25 @@ import org.junit.rules.TemporaryFolder;
* Tests for {@link HadoopResourceId}.
*/
public class HadoopResourceIdTest {
- private Configuration configuration;
+
private MiniDFSCluster hdfsCluster;
private URI hdfsClusterBaseUri;
- private HadoopFileSystem fileSystem;
+
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Before
public void setUp() throws Exception {
- configuration = new Configuration();
+ Configuration configuration = new Configuration();
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
hdfsCluster = builder.build();
hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
- fileSystem = new HadoopFileSystem(configuration);
+
+ // Register HadoopFileSystem for this test.
+ HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
+ options.setHdfsConfiguration(Collections.singletonList(configuration));
+ FileSystems.setDefaultConfigInWorkers(options);
}
@After
@@ -57,7 +63,9 @@ public class HadoopResourceIdTest {
@Test
public void testResourceIdTester() throws Exception {
- FileSystems.setDefaultConfigInWorkers(TestPipeline.testingPipelineOptions());
- ResourceIdTester.runResourceIdBattery(new HadoopResourceId(hdfsClusterBaseUri));
+ ResourceId baseDirectory =
+ FileSystems.matchNewResource(
+ "hdfs://" + hdfsClusterBaseUri.getPath(), true /* isDirectory */);
+ ResourceIdTester.runResourceIdBattery(baseDirectory);
}
}