You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ti...@apache.org on 2019/03/19 16:10:52 UTC
[beam] branch master updated: [BEAM-6851] Support recursive globs
in HadoopFileSystem.
This is an automated email from the ASF dual-hosted git repository.
timrobertson100 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d4417d7 [BEAM-6851] Support recursive globs in HadoopFileSystem.
new dc02997 Merge pull request #8079: [BEAM-6851] Support recursive globs in HadoopFileSystem.
d4417d7 is described below
commit d4417d77e397b44cabbaf681740e8ea690b5d54a
Author: Kyle Winkelman <ky...@optum.com>
AuthorDate: Mon Mar 18 11:51:36 2019 -0500
[BEAM-6851] Support recursive globs in HadoopFileSystem.
---
.../apache/beam/sdk/io/hdfs/HadoopFileSystem.java | 88 ++++++++++++++++++----
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 88 ++++++++++++++++++++++
2 files changed, 160 insertions(+), 16 deletions(-)
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 4eefc86..72e653b 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -29,7 +29,9 @@ import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -87,26 +89,28 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
for (String spec : specs) {
try {
+ Set<Metadata> metadata = new HashSet<>();
+
FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
- if (fileStatuses == null) {
- resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));
- continue;
+ if (fileStatuses != null) {
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isFile()) {
+ metadata.add(toMetadata(fileStatus));
+ }
+ }
}
- List<Metadata> metadata = new ArrayList<>();
- for (FileStatus fileStatus : fileStatuses) {
- if (fileStatus.isFile()) {
- URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
- metadata.add(
- Metadata.builder()
- .setResourceId(new HadoopResourceId(uri))
- .setIsReadSeekEfficient(true)
- .setSizeBytes(fileStatus.getLen())
- .setLastModifiedMillis(fileStatus.getModificationTime())
- .build());
- }
+ if (spec.contains("**")) {
+ int index = spec.indexOf("**");
+ metadata.addAll(
+ matchRecursiveGlob(spec.substring(0, index + 1), spec.substring(index + 1)));
+ }
+
+ if (metadata.isEmpty()) {
+ resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));
+ } else {
+ resultsBuilder.add(MatchResult.create(Status.OK, new ArrayList<>(metadata)));
}
- resultsBuilder.add(MatchResult.create(Status.OK, metadata));
} catch (IOException e) {
resultsBuilder.add(MatchResult.create(Status.ERROR, e));
}
@@ -114,6 +118,58 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
return resultsBuilder.build();
}
+ private Set<Metadata> matchRecursiveGlob(String directorySpec, String fileSpec)
+ throws IOException {
+ Set<Metadata> metadata = new HashSet<>();
+ if (directorySpec.contains("*")) {
+ // An abstract directory with a wildcard is converted to concrete directories to search.
+ FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec));
+ for (FileStatus directoryStatus : directoryStatuses) {
+ if (directoryStatus.isDirectory()) {
+ metadata.addAll(
+ matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
+ }
+ }
+ } else {
+ // A concrete directory is searched.
+ FileStatus[] fileStatuses = fileSystem.globStatus(new Path(directorySpec + "/" + fileSpec));
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isFile()) {
+ metadata.add(toMetadata(fileStatus));
+ }
+ }
+
+ // All sub-directories of a concrete directory are searched.
+ FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec + "/*"));
+ for (FileStatus directoryStatus : directoryStatuses) {
+ if (directoryStatus.isDirectory()) {
+ metadata.addAll(
+ matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
+ }
+ }
+
+ // Handle additional instances of recursive globs.
+ if (fileSpec.contains("**")) {
+ int index = fileSpec.indexOf("**");
+ metadata.addAll(
+ matchRecursiveGlob(
+ directorySpec + "/" + fileSpec.substring(0, index + 1),
+ fileSpec.substring(index + 1)));
+ }
+ }
+ return metadata;
+ }
+
+ private Metadata toMetadata(FileStatus fileStatus) {
+ URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
+ return Metadata.builder()
+ .setResourceId(new HadoopResourceId(uri))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes(fileStatus.getLen())
+ .setLastModifiedMillis(fileStatus.getModificationTime())
+ .build();
+ }
+
@Override
protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
throws IOException {
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index acb550f..f8ef812 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -242,6 +242,94 @@ public class HadoopFileSystemTest {
}
@Test
+ public void testMatchForRecursiveGlob() throws Exception {
+ create("1/testFile1", "testData1".getBytes(StandardCharsets.UTF_8));
+ create("1/A/testFile1A", "testData1A".getBytes(StandardCharsets.UTF_8));
+ create("1/A/A/testFile1AA", "testData1AA".getBytes(StandardCharsets.UTF_8));
+ create("1/B/testFile1B", "testData1B".getBytes(StandardCharsets.UTF_8));
+
+ // ensure files exist
+ assertArrayEquals("testData1".getBytes(StandardCharsets.UTF_8), read("1/testFile1", 0));
+ assertArrayEquals("testData1A".getBytes(StandardCharsets.UTF_8), read("1/A/testFile1A", 0));
+ assertArrayEquals("testData1AA".getBytes(StandardCharsets.UTF_8), read("1/A/A/testFile1AA", 0));
+ assertArrayEquals("testData1B".getBytes(StandardCharsets.UTF_8), read("1/B/testFile1B", 0));
+
+ List<MatchResult> matchResults =
+ fileSystem.match(ImmutableList.of(testPath("**testFile1*").toString()));
+
+ assertThat(matchResults, hasSize(1));
+ assertThat(
+ Iterables.getOnlyElement(matchResults).metadata(),
+ containsInAnyOrder(
+ Metadata.builder()
+ .setResourceId(testPath("1/testFile1"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testData1".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("1/testFile1"))
+ .build(),
+ Metadata.builder()
+ .setResourceId(testPath("1/A/testFile1A"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testData1A".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("1/A/testFile1A"))
+ .build(),
+ Metadata.builder()
+ .setResourceId(testPath("1/A/A/testFile1AA"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testData1AA".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("1/A/A/testFile1AA"))
+ .build(),
+ Metadata.builder()
+ .setResourceId(testPath("1/B/testFile1B"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testData1B".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("1/B/testFile1B"))
+ .build()));
+
+ matchResults =
+ fileSystem.match(
+ ImmutableList.of(
+ testPath("1**File1A").toString(),
+ testPath("1**A**testFile1AA").toString(),
+ testPath("1/B**").toString(),
+ testPath("2**").toString()));
+
+ final List<MatchResult> expected =
+ ImmutableList.of(
+ MatchResult.create(
+ Status.OK,
+ ImmutableList.of(
+ Metadata.builder()
+ .setResourceId(testPath("1/A/testFile1A"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testData1A".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("1/A/testFile1A"))
+ .build())),
+ MatchResult.create(
+ Status.OK,
+ ImmutableList.of(
+ Metadata.builder()
+ .setResourceId(testPath("1/A/A/testFile1AA"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testData1AA".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("1/A/A/testFile1AA"))
+ .build())),
+ MatchResult.create(
+ Status.OK,
+ ImmutableList.of(
+ Metadata.builder()
+ .setResourceId(testPath("1/B/testFile1B"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testData1B".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("1/B/testFile1B"))
+ .build())),
+ MatchResult.create(Status.NOT_FOUND, ImmutableList.of()));
+
+ assertThat(matchResults, hasSize(4));
+ assertThat(matchResults, equalTo(expected));
+ }
+
+ @Test
public void testRename() throws Exception {
create("testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
create("testFileB", "testDataB".getBytes(StandardCharsets.UTF_8));