You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ti...@apache.org on 2019/03/19 16:10:52 UTC

[beam] branch master updated: [BEAM-6851] Support recursive globs in HadoopFileSystem.

This is an automated email from the ASF dual-hosted git repository.

timrobertson100 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d4417d7  [BEAM-6851] Support recursive globs in HadoopFileSystem.
     new dc02997  Merge pull request #8079: [BEAM-6851] Support recursive globs in HadoopFileSystem.
d4417d7 is described below

commit d4417d77e397b44cabbaf681740e8ea690b5d54a
Author: Kyle Winkelman <ky...@optum.com>
AuthorDate: Mon Mar 18 11:51:36 2019 -0500

    [BEAM-6851] Support recursive globs in HadoopFileSystem.
---
 .../apache/beam/sdk/io/hdfs/HadoopFileSystem.java  | 88 ++++++++++++++++++----
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java     | 88 ++++++++++++++++++++++
 2 files changed, 160 insertions(+), 16 deletions(-)

diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 4eefc86..72e653b 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -29,7 +29,9 @@ import java.nio.file.FileAlreadyExistsException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -87,26 +89,28 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
     ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
     for (String spec : specs) {
       try {
+        Set<Metadata> metadata = new HashSet<>();
+
         FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
-        if (fileStatuses == null) {
-          resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));
-          continue;
+        if (fileStatuses != null) {
+          for (FileStatus fileStatus : fileStatuses) {
+            if (fileStatus.isFile()) {
+              metadata.add(toMetadata(fileStatus));
+            }
+          }
         }
 
-        List<Metadata> metadata = new ArrayList<>();
-        for (FileStatus fileStatus : fileStatuses) {
-          if (fileStatus.isFile()) {
-            URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
-            metadata.add(
-                Metadata.builder()
-                    .setResourceId(new HadoopResourceId(uri))
-                    .setIsReadSeekEfficient(true)
-                    .setSizeBytes(fileStatus.getLen())
-                    .setLastModifiedMillis(fileStatus.getModificationTime())
-                    .build());
-          }
+        if (spec.contains("**")) {
+          int index = spec.indexOf("**");
+          metadata.addAll(
+              matchRecursiveGlob(spec.substring(0, index + 1), spec.substring(index + 1)));
+        }
+
+        if (metadata.isEmpty()) {
+          resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));
+        } else {
+          resultsBuilder.add(MatchResult.create(Status.OK, new ArrayList<>(metadata)));
         }
-        resultsBuilder.add(MatchResult.create(Status.OK, metadata));
       } catch (IOException e) {
         resultsBuilder.add(MatchResult.create(Status.ERROR, e));
       }
@@ -114,6 +118,58 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
     return resultsBuilder.build();
   }
 
+  private Set<Metadata> matchRecursiveGlob(String directorySpec, String fileSpec)
+      throws IOException {
+    Set<Metadata> metadata = new HashSet<>();
+    if (directorySpec.contains("*")) {
+      // An abstract directory with a wildcard is converted to concrete directories to search.
+      FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec));
+      for (FileStatus directoryStatus : directoryStatuses) {
+        if (directoryStatus.isDirectory()) {
+          metadata.addAll(
+              matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
+        }
+      }
+    } else {
+      // A concrete directory is searched.
+      FileStatus[] fileStatuses = fileSystem.globStatus(new Path(directorySpec + "/" + fileSpec));
+      for (FileStatus fileStatus : fileStatuses) {
+        if (fileStatus.isFile()) {
+          metadata.add(toMetadata(fileStatus));
+        }
+      }
+
+      // All sub-directories of a concrete directory are searched.
+      FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec + "/*"));
+      for (FileStatus directoryStatus : directoryStatuses) {
+        if (directoryStatus.isDirectory()) {
+          metadata.addAll(
+              matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
+        }
+      }
+
+      // Handle additional instances of recursive globs.
+      if (fileSpec.contains("**")) {
+        int index = fileSpec.indexOf("**");
+        metadata.addAll(
+            matchRecursiveGlob(
+                directorySpec + "/" + fileSpec.substring(0, index + 1),
+                fileSpec.substring(index + 1)));
+      }
+    }
+    return metadata;
+  }
+
+  private Metadata toMetadata(FileStatus fileStatus) {
+    URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
+    return Metadata.builder()
+        .setResourceId(new HadoopResourceId(uri))
+        .setIsReadSeekEfficient(true)
+        .setSizeBytes(fileStatus.getLen())
+        .setLastModifiedMillis(fileStatus.getModificationTime())
+        .build();
+  }
+
   @Override
   protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
       throws IOException {
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index acb550f..f8ef812 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -242,6 +242,94 @@ public class HadoopFileSystemTest {
   }
 
   @Test
+  public void testMatchForRecursiveGlob() throws Exception {
+    create("1/testFile1", "testData1".getBytes(StandardCharsets.UTF_8));
+    create("1/A/testFile1A", "testData1A".getBytes(StandardCharsets.UTF_8));
+    create("1/A/A/testFile1AA", "testData1AA".getBytes(StandardCharsets.UTF_8));
+    create("1/B/testFile1B", "testData1B".getBytes(StandardCharsets.UTF_8));
+
+    // ensure files exist
+    assertArrayEquals("testData1".getBytes(StandardCharsets.UTF_8), read("1/testFile1", 0));
+    assertArrayEquals("testData1A".getBytes(StandardCharsets.UTF_8), read("1/A/testFile1A", 0));
+    assertArrayEquals("testData1AA".getBytes(StandardCharsets.UTF_8), read("1/A/A/testFile1AA", 0));
+    assertArrayEquals("testData1B".getBytes(StandardCharsets.UTF_8), read("1/B/testFile1B", 0));
+
+    List<MatchResult> matchResults =
+        fileSystem.match(ImmutableList.of(testPath("**testFile1*").toString()));
+
+    assertThat(matchResults, hasSize(1));
+    assertThat(
+        Iterables.getOnlyElement(matchResults).metadata(),
+        containsInAnyOrder(
+            Metadata.builder()
+                .setResourceId(testPath("1/testFile1"))
+                .setIsReadSeekEfficient(true)
+                .setSizeBytes("testData1".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("1/testFile1"))
+                .build(),
+            Metadata.builder()
+                .setResourceId(testPath("1/A/testFile1A"))
+                .setIsReadSeekEfficient(true)
+                .setSizeBytes("testData1A".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("1/A/testFile1A"))
+                .build(),
+            Metadata.builder()
+                .setResourceId(testPath("1/A/A/testFile1AA"))
+                .setIsReadSeekEfficient(true)
+                .setSizeBytes("testData1AA".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("1/A/A/testFile1AA"))
+                .build(),
+            Metadata.builder()
+                .setResourceId(testPath("1/B/testFile1B"))
+                .setIsReadSeekEfficient(true)
+                .setSizeBytes("testData1B".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("1/B/testFile1B"))
+                .build()));
+
+    matchResults =
+        fileSystem.match(
+            ImmutableList.of(
+                testPath("1**File1A").toString(),
+                testPath("1**A**testFile1AA").toString(),
+                testPath("1/B**").toString(),
+                testPath("2**").toString()));
+
+    final List<MatchResult> expected =
+        ImmutableList.of(
+            MatchResult.create(
+                Status.OK,
+                ImmutableList.of(
+                    Metadata.builder()
+                        .setResourceId(testPath("1/A/testFile1A"))
+                        .setIsReadSeekEfficient(true)
+                        .setSizeBytes("testData1A".getBytes(StandardCharsets.UTF_8).length)
+                        .setLastModifiedMillis(lastModified("1/A/testFile1A"))
+                        .build())),
+            MatchResult.create(
+                Status.OK,
+                ImmutableList.of(
+                    Metadata.builder()
+                        .setResourceId(testPath("1/A/A/testFile1AA"))
+                        .setIsReadSeekEfficient(true)
+                        .setSizeBytes("testData1AA".getBytes(StandardCharsets.UTF_8).length)
+                        .setLastModifiedMillis(lastModified("1/A/A/testFile1AA"))
+                        .build())),
+            MatchResult.create(
+                Status.OK,
+                ImmutableList.of(
+                    Metadata.builder()
+                        .setResourceId(testPath("1/B/testFile1B"))
+                        .setIsReadSeekEfficient(true)
+                        .setSizeBytes("testData1B".getBytes(StandardCharsets.UTF_8).length)
+                        .setLastModifiedMillis(lastModified("1/B/testFile1B"))
+                        .build())),
+            MatchResult.create(Status.NOT_FOUND, ImmutableList.of()));
+
+    assertThat(matchResults, hasSize(4));
+    assertThat(matchResults, equalTo(expected));
+  }
+
+  @Test
   public void testRename() throws Exception {
     create("testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
     create("testFileB", "testDataB".getBytes(StandardCharsets.UTF_8));