You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/27 21:42:02 UTC

[pinot] branch master updated: collect file info like mtime, length while listing files for free (#9466)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f92392b90 collect file info like mtime, length while listing files for free (#9466)
9f92392b90 is described below

commit 9f92392b902fb9f25ce0af63bae9d925a4b9525b
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Sep 27 14:41:57 2022 -0700

    collect file info like mtime, length while listing files for free (#9466)
---
 .../apache/pinot/plugin/filesystem/GcsPinotFS.java | 61 ++++++++++++---
 .../{TestGcsPinotFS.java => GcsPinotFSTest.java}   | 69 ++++++++++++++++-
 .../{TestGcsUri.java => GcsUriTest.java}           |  2 +-
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 58 ++++++++++----
 .../pinot/plugin/filesystem/S3PinotFSTest.java     | 60 ++++++++++++++-
 .../apache/pinot/spi/filesystem/FileMetadata.java  | 89 ++++++++++++++++++++++
 .../apache/pinot/spi/filesystem/LocalPinotFS.java  | 19 +++++
 .../org/apache/pinot/spi/filesystem/PinotFS.java   | 14 ++++
 .../pinot/spi/filesystem/LocalPinotFSTest.java     | 60 +++++++++++++++
 9 files changed, 399 insertions(+), 33 deletions(-)

diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
index dc5a1787e2..1092dd9505 100644
--- a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
@@ -43,9 +43,11 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Consumer;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -159,7 +161,50 @@ public class GcsPinotFS extends BasePinotFS {
   @Override
   public String[] listFiles(URI fileUri, boolean recursive)
       throws IOException {
-    return listFiles(new GcsUri(fileUri), recursive);
+    return listFilesFromGcsUri(new GcsUri(fileUri), recursive);
+  }
+
+  private String[] listFilesFromGcsUri(GcsUri gcsFileUri, boolean recursive)
+      throws IOException {
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    String prefix = gcsFileUri.getPrefix();
+    String bucketName = gcsFileUri.getBucketName();
+    visitFiles(gcsFileUri, recursive, blob -> {
+      if (!blob.getName().equals(prefix)) {
+        builder.add(GcsUri.createGcsUri(bucketName, blob.getName()).toString());
+      }
+    });
+    String[] listedFiles = builder.build().toArray(new String[0]);
+    LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, gcsFileUri, recursive);
+    return listedFiles;
+  }
+
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+      throws IOException {
+    ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+    GcsUri gcsFileUri = new GcsUri(fileUri);
+    String prefix = gcsFileUri.getPrefix();
+    String bucketName = gcsFileUri.getBucketName();
+    visitFiles(gcsFileUri, recursive, blob -> {
+      if (!blob.getName().equals(prefix)) {
+        // Note: isDirectory flag is only set when listing with BlobListOption.currentDirectory() i.e non-recursively.
+        // For simplicity, we check if a path is directory by checking if it ends with '/', as done in S3PinotFS.
+        boolean isDirectory = blob.getName().endsWith(GcsUri.DELIMITER);
+        FileMetadata.Builder fileBuilder =
+            new FileMetadata.Builder().setFilePath(GcsUri.createGcsUri(bucketName, blob.getName()).toString())
+                .setLength(blob.getSize()).setIsDirectory(isDirectory);
+        if (!isDirectory) {
+          // Note: if it's a directory, updateTime is set to null, and calling this getter leads to NPE.
+          // public Long getUpdateTime() { return updateTime; }. So skip this for directory.
+          fileBuilder.setLastModifiedTime(blob.getUpdateTime());
+        }
+        listBuilder.add(fileBuilder.build());
+      }
+    });
+    ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+    LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), gcsFileUri, recursive);
+    return listedFiles;
   }
 
   @Override
@@ -299,10 +344,9 @@ public class GcsPinotFS extends BasePinotFS {
     return isEmpty;
   }
 
-  private String[] listFiles(GcsUri fileUri, boolean recursive)
+  private void visitFiles(GcsUri fileUri, boolean recursive, Consumer<Blob> visitor)
       throws IOException {
     try {
-      ImmutableList.Builder<String> builder = ImmutableList.builder();
       String prefix = fileUri.getPrefix();
       Page<Blob> page;
       if (recursive) {
@@ -311,14 +355,7 @@ public class GcsPinotFS extends BasePinotFS {
         page = _storage.list(fileUri.getBucketName(), Storage.BlobListOption.prefix(prefix),
             Storage.BlobListOption.currentDirectory());
       }
-      page.iterateAll().forEach(blob -> {
-        if (!blob.getName().equals(prefix)) {
-          builder.add(GcsUri.createGcsUri(fileUri.getBucketName(), blob.getName()).toString());
-        }
-      });
-      String[] listedFiles = builder.build().toArray(new String[0]);
-      LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
-      return listedFiles;
+      page.iterateAll().forEach(visitor);
     } catch (Exception t) {
       throw new IOException(t);
     }
@@ -423,7 +460,7 @@ public class GcsPinotFS extends BasePinotFS {
       mkdir(dstUri.getUri());
     }
     boolean copySucceeded = true;
-    for (String directoryEntry : listFiles(srcUri, true)) {
+    for (String directoryEntry : listFilesFromGcsUri(srcUri, true)) {
       GcsUri srcFile = new GcsUri(URI.create(directoryEntry));
       String relativeSrcPath = srcUri.relativize(srcFile);
       GcsUri dstFile = dstUri.resolve(relativeSrcPath);
diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsPinotFS.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java
similarity index 74%
rename from pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsPinotFS.java
rename to pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java
index 17bbb38b26..bcde7b1556 100644
--- a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsPinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java
@@ -32,9 +32,12 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.testng.Assert;
 import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -69,7 +72,7 @@ import static org.testng.Assert.assertTrue;
  * If credentials are not supplied then all tests are skipped.
  */
 @Test(singleThreaded = true)
-public class TestGcsPinotFS {
+public class GcsPinotFSTest {
   private static final String DATA_DIR_PREFIX = "testing-data";
 
   private GcsPinotFS _pinotFS;
@@ -223,8 +226,8 @@ public class TestGcsPinotFS {
     String directoryName = Paths.get(gcsDirectoryUri.getPath()).getFileName().toString();
     String directoryCopyName = Paths.get(gcsDirectoryUriCopy.getPath()).getFileName().toString();
     for (GcsUri element : ImmutableList.copyOf(expectedElements)) {
-      expectedElementsCopy.add(
-          createGcsUri(element.getBucketName(), element.getPath().replace(directoryName, directoryCopyName)));
+      expectedElementsCopy
+          .add(createGcsUri(element.getBucketName(), element.getPath().replace(directoryName, directoryCopyName)));
     }
     expectedElementsCopy.addAll(expectedElements);
     assertEquals(listFilesToStream(_dataDir).collect(toSet()), expectedElementsCopy);
@@ -243,4 +246,64 @@ public class TestGcsPinotFS {
     _pinotFS.move(movedFileGcsUri.getUri(), nonEmptyFileGcsUri.getUri(), false);
     assertTrue(listFilesToStream(gcsDirectoryUri).anyMatch(uri -> uri.equals(nonEmptyFileGcsUri)));
   }
+
+  @Test
+  public void testListFilesWithMetadata()
+      throws Exception {
+    skipIfNotConfigured();
+
+    // Create empty file
+    Path localTmpDir = createLocalTempDirectory();
+    Path emptyFile = localTmpDir.resolve("empty");
+    emptyFile.toFile().createNewFile();
+
+    // Create 5 subfolders with files inside.
+    int count = 5;
+    Set<String> expectedNonRecursive = new HashSet<>();
+    Set<String> expectedRecursive = new HashSet<>();
+    for (int i = 0; i < count; i++) {
+      GcsUri testDir = _dataDir.resolve("testDir" + i);
+      _pinotFS.mkdir(testDir.getUri());
+      expectedNonRecursive.add(appendSlash(testDir).toString());
+
+      GcsUri testFile = testDir.resolve("testFile" + i);
+      // Create the file by copying an empty file there.
+      _pinotFS.copyFromLocalFile(emptyFile.toFile(), testFile.getUri());
+      expectedRecursive.add(appendSlash(testDir).toString());
+      expectedRecursive.add(testFile.toString());
+    }
+    GcsUri testDirEmpty = _dataDir.resolve("testDirEmpty");
+    _pinotFS.mkdir(testDirEmpty.getUri());
+    expectedNonRecursive.add(appendSlash(testDirEmpty).toString());
+    expectedRecursive.add(appendSlash(testDirEmpty).toString());
+
+    GcsUri testRootFile = _dataDir.resolve("testRootFile");
+    _pinotFS.copyFromLocalFile(emptyFile.toFile(), testRootFile.getUri());
+    expectedNonRecursive.add(testRootFile.toString());
+    expectedRecursive.add(testRootFile.toString());
+
+    // Assert that recursive list files and nonrecursive list files are as expected
+    String[] files = _pinotFS.listFiles(_dataDir.getUri(), false);
+    Assert.assertEquals(files.length, count + 2);
+    Assert.assertTrue(expectedNonRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+    files = _pinotFS.listFiles(_dataDir.getUri(), true);
+    Assert.assertEquals(files.length, count * 2 + 2);
+    Assert.assertTrue(expectedRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+    // Assert that recursive list files and nonrecursive list files with file info are as expected
+    List<FileMetadata> fileMetadata = _pinotFS.listFilesWithMetadata(_dataDir.getUri(), false);
+    Assert.assertEquals(fileMetadata.size(), count + 2);
+    Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+    Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+    Assert.assertTrue(expectedNonRecursive
+            .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+        fileMetadata.toString());
+    fileMetadata = _pinotFS.listFilesWithMetadata(_dataDir.getUri(), true);
+    Assert.assertEquals(fileMetadata.size(), count * 2 + 2);
+    Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+    Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), count + 1);
+    Assert.assertTrue(
+        expectedRecursive.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+        fileMetadata.toString());
+  }
 }
diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsUri.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsUriTest.java
similarity index 99%
rename from pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsUri.java
rename to pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsUriTest.java
index 355a32ab46..73b2bf5982 100644
--- a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsUri.java
+++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsUriTest.java
@@ -27,7 +27,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertThrows;
 
 
-public class TestGcsUri {
+public class GcsUriTest {
   @Test
   public void testCreateGcsUriUsingADifferentScheme() {
     URI uri = URI.create("file://bucket/file");
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index f9cccb7d12..e2e366dfe3 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -34,9 +34,11 @@ import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -435,8 +437,48 @@ public class S3PinotFS extends BasePinotFS {
   @Override
   public String[] listFiles(URI fileUri, boolean recursive)
       throws IOException {
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    visitFiles(fileUri, recursive, s3Object -> {
+      // TODO: Looks like S3PinotFS filters out directories, inconsistent with the other implementations.
+      // Only add files and not directories
+      if (!s3Object.key().equals(fileUri.getPath()) && !s3Object.key().endsWith(DELIMITER)) {
+        builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object));
+      }
+    });
+    String[] listedFiles = builder.build().toArray(new String[0]);
+    LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
+    return listedFiles;
+  }
+
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+      throws IOException {
+    ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+    visitFiles(fileUri, recursive, s3Object -> {
+      if (!s3Object.key().equals(fileUri.getPath())) {
+        FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
+            .setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
+            .setLastModifiedTime(s3Object.lastModified().toEpochMilli()).setLength(s3Object.size())
+            .setIsDirectory(s3Object.key().endsWith(DELIMITER));
+        listBuilder.add(fileBuilder.build());
+      }
+    });
+    ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+    LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive);
+    return listedFiles;
+  }
+
+  private static String getNormalizedFileKey(S3Object s3Object) {
+    String fileKey = s3Object.key();
+    if (fileKey.startsWith(DELIMITER)) {
+      fileKey = fileKey.substring(1);
+    }
+    return fileKey;
+  }
+
+  private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object> visitor)
+      throws IOException {
     try {
-      ImmutableList.Builder<String> builder = ImmutableList.builder();
       String continuationToken = null;
       boolean isDone = false;
       String prefix = normalizeToDirectoryPrefix(fileUri);
@@ -457,22 +499,10 @@ public class S3PinotFS extends BasePinotFS {
         ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
         LOGGER.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
         List<S3Object> filesReturned = listObjectsV2Response.contents();
-        filesReturned.stream().forEach(object -> {
-          //Only add files and not directories
-          if (!object.key().equals(fileUri.getPath()) && !object.key().endsWith(DELIMITER)) {
-            String fileKey = object.key();
-            if (fileKey.startsWith(DELIMITER)) {
-              fileKey = fileKey.substring(1);
-            }
-            builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
-          }
-        });
+        filesReturned.forEach(visitor);
         isDone = !listObjectsV2Response.isTruncated();
         continuationToken = listObjectsV2Response.nextContinuationToken();
       }
-      String[] listedFiles = builder.build().toArray(new String[0]);
-      LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
-      return listedFiles;
     } catch (Throwable t) {
       throw new IOException(t);
     }
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index 71f3794d61..b877c29e91 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -27,7 +27,9 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -143,8 +145,12 @@ public class S3PinotFSTest {
     for (String fileName : originalFiles) {
       createEmptyFile(folder, fileName);
     }
+    // Files in sub folders should be skipped.
+    createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
+    createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
 
     String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false);
+    Assert.assertEquals(actualFiles.length, originalFiles.length);
 
     actualFiles = Arrays.stream(actualFiles).filter(x -> x.contains("list-2")).toArray(String[]::new);
     Assert.assertEquals(actualFiles.length, originalFiles.length);
@@ -158,24 +164,72 @@ public class S3PinotFSTest {
   public void testListFilesInFolderRecursive()
       throws Exception {
     String folder = "list-files-rec";
-    String[] nestedFolders = new String[]{"list-files-child-1", "list-files-child-2"};
+    String[] nestedFolders = new String[]{"", "list-files-child-1", "list-files-child-2"};
     String[] originalFiles = new String[]{"a-list-3.txt", "b-list-3.txt", "c-list-3.txt"};
 
     List<String> expectedResultList = new ArrayList<>();
     for (String childFolder : nestedFolders) {
-      String folderName = folder + DELIMITER + childFolder;
+      String folderName = folder + (childFolder.length() == 0 ? "" : DELIMITER + childFolder);
       for (String fileName : originalFiles) {
         createEmptyFile(folderName, fileName);
         expectedResultList.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName + DELIMITER + fileName));
       }
     }
     String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), true);
-
+    Assert.assertEquals(actualFiles.length, expectedResultList.size());
     actualFiles = Arrays.stream(actualFiles).filter(x -> x.contains("list-3")).toArray(String[]::new);
     Assert.assertEquals(actualFiles.length, expectedResultList.size());
     Assert.assertTrue(Arrays.equals(expectedResultList.toArray(), actualFiles));
   }
 
+  @Test
+  public void testListFilesWithMetadataInFolderNonRecursive()
+      throws Exception {
+    String folder = "list-files-with-md";
+    String[] originalFiles = new String[]{"a-list-2.txt", "b-list-2.txt", "c-list-2.txt"};
+    for (String fileName : originalFiles) {
+      createEmptyFile(folder, fileName);
+    }
+    // Files in sub folders should be skipped.
+    createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
+    createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
+    List<FileMetadata> actualFiles =
+        _s3PinotFS.listFilesWithMetadata(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false);
+    Assert.assertEquals(actualFiles.size(), originalFiles.length);
+    List<String> actualFilePaths =
+        actualFiles.stream().map(FileMetadata::getFilePath).filter(fp -> fp.contains("list-2"))
+            .collect(Collectors.toList());
+    Assert.assertEquals(actualFilePaths.size(), originalFiles.length);
+    Assert.assertEquals(Arrays.stream(originalFiles)
+        .map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + fileName))
+        .collect(Collectors.toList()), actualFilePaths);
+  }
+
+  @Test
+  public void testListFilesWithMetadataInFolderRecursive()
+      throws Exception {
+    String folder = "list-files-rec-with-md";
+    String[] nestedFolders = new String[]{"", "list-files-child-1", "list-files-child-2"};
+    String[] originalFiles = new String[]{"a-list-3.txt", "b-list-3.txt", "c-list-3.txt"};
+
+    List<String> expectedResultList = new ArrayList<>();
+    for (String childFolder : nestedFolders) {
+      String folderName = folder + (childFolder.length() == 0 ? "" : DELIMITER + childFolder);
+      for (String fileName : originalFiles) {
+        createEmptyFile(folderName, fileName);
+        expectedResultList.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName + DELIMITER + fileName));
+      }
+    }
+    List<FileMetadata> actualFiles =
+        _s3PinotFS.listFilesWithMetadata(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), true);
+    Assert.assertEquals(actualFiles.size(), expectedResultList.size());
+    List<String> actualFilePaths =
+        actualFiles.stream().map(FileMetadata::getFilePath).filter(fp -> fp.contains("list-3"))
+            .collect(Collectors.toList());
+    Assert.assertEquals(actualFilePaths.size(), expectedResultList.size());
+    Assert.assertEquals(expectedResultList, actualFilePaths);
+  }
+
   @Test
   public void testDeleteFile()
       throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
new file mode 100644
index 0000000000..4def90b6de
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.filesystem;
+
+/**
+ * FileMetadata contains the file path and many optional file attributes like mtime, length etc.
+ */
+public class FileMetadata {
+  private final String _filePath;
+  private final long _lastModifiedTime;
+  private final long _length;
+  private final boolean _isDirectory;
+
+  private FileMetadata(String filePath, long lastModifiedTime, long length, boolean isDirectory) {
+    _filePath = filePath;
+    _lastModifiedTime = lastModifiedTime;
+    _length = length;
+    _isDirectory = isDirectory;
+  }
+
+  public String getFilePath() {
+    return _filePath;
+  }
+
+  public long getLastModifiedTime() {
+    return _lastModifiedTime;
+  }
+
+  public long getLength() {
+    return _length;
+  }
+
+  public boolean isDirectory() {
+    return _isDirectory;
+  }
+
+  @Override
+  public String toString() {
+    return "FileInfo{" + "_filePath='" + _filePath + '\'' + ", _lastModifiedTime=" + _lastModifiedTime + ", _length="
+        + _length + ", _isDirectory=" + _isDirectory + '}';
+  }
+
+  public static class Builder {
+    private String _filePath;
+    private long _lastModifiedTime;
+    private long _length;
+    private boolean _isDirectory;
+
+    public Builder setFilePath(String filePath) {
+      _filePath = filePath;
+      return this;
+    }
+
+    public Builder setLastModifiedTime(long lastModifiedTime) {
+      _lastModifiedTime = lastModifiedTime;
+      return this;
+    }
+
+    public Builder setLength(long length) {
+      _length = length;
+      return this;
+    }
+
+    public Builder setIsDirectory(boolean isDirectory) {
+      _isDirectory = isDirectory;
+      return this;
+    }
+
+    public FileMetadata build() {
+      return new FileMetadata(_filePath, _lastModifiedTime, _length, _isDirectory);
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
index 0dcbd00818..ed6c8f2143 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
@@ -30,6 +30,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
@@ -115,6 +117,23 @@ public class LocalPinotFS extends BasePinotFS {
     }
   }
 
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+      throws IOException {
+    File file = toFile(fileUri);
+    if (!recursive) {
+      return Arrays.stream(file.list()).map(s -> getFileInfo(new File(file, s))).collect(Collectors.toList());
+    } else {
+      return Files.walk(Paths.get(fileUri)).filter(s -> !s.equals(file.toPath())).map(p -> getFileInfo(p.toFile()))
+          .collect(Collectors.toList());
+    }
+  }
+
+  private static FileMetadata getFileInfo(File file) {
+    return new FileMetadata.Builder().setFilePath(file.getAbsolutePath()).setLastModifiedTime(file.lastModified())
+        .setLength(file.length()).setIsDirectory(file.isDirectory()).build();
+  }
+
   @Override
   public void copyToLocalFile(URI srcUri, File dstFile)
       throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
index 3ad2243a30..286a02c7d4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
+import java.util.List;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -156,6 +157,19 @@ public interface PinotFS extends Closeable, Serializable {
   String[] listFiles(URI fileUri, boolean recursive)
       throws IOException;
 
+  /**
+   * Like listFiles but also return some file metadata so no need to call length(), isDirectory(), and
+   * lastModified() separately, which can be slow and costly for remote file system.
+   * @param fileUri location of file
+   * @param recursive if we want to list files recursively
+   * @return a list of FileMetadata that contains file path, and a few file metadata.
+   * @throws IOException on IO failure. See specific implementation
+   */
+  default List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Copies a file from a remote filesystem to the local one. Keeps the original file.
    * @param srcUri location of current file on remote filesystem (must not be a directory)
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
index f7bff51d43..7b3338bd11 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
@@ -21,6 +21,10 @@ package org.apache.pinot.spi.filesystem;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -250,4 +254,60 @@ public class LocalPinotFSTest {
     localPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
     Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
   }
+
+  @Test
+  public void testListFilesWithMetadata()
+      throws IOException {
+    LocalPinotFS localPinotFS = new LocalPinotFS();
+    File tempDirPath = new File(_absoluteTmpDirPath, "test-list-files-with-md");
+    Assert.assertTrue(tempDirPath.mkdir());
+
+    // Create a testDir and file underneath directory
+    int count = 5;
+    List<String> expectedNonRecursive = new ArrayList<>();
+    List<String> expectedRecursive = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      File testDir = new File(tempDirPath, "testDir" + i);
+      Assert.assertTrue(testDir.mkdir());
+      expectedNonRecursive.add(testDir.getAbsolutePath());
+
+      File testFile = new File(testDir, "testFile" + i);
+      Assert.assertTrue(testFile.createNewFile());
+      expectedRecursive.add(testDir.getAbsolutePath());
+      expectedRecursive.add(testFile.getAbsolutePath());
+    }
+    File testDirEmpty = new File(tempDirPath, "testDirEmpty");
+    Assert.assertTrue(testDirEmpty.mkdir());
+    expectedNonRecursive.add(testDirEmpty.getAbsolutePath());
+    expectedRecursive.add(testDirEmpty.getAbsolutePath());
+
+    File testRootFile = new File(tempDirPath, "testRootFile");
+    Assert.assertTrue(testRootFile.createNewFile());
+    expectedNonRecursive.add(testRootFile.getAbsolutePath());
+    expectedRecursive.add(testRootFile.getAbsolutePath());
+
+    // Assert that recursive list files and nonrecursive list files are as expected
+    String[] files = localPinotFS.listFiles(tempDirPath.toURI(), false);
+    Assert.assertEquals(files.length, count + 2);
+    Assert.assertTrue(expectedNonRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+    files = localPinotFS.listFiles(tempDirPath.toURI(), true);
+    Assert.assertEquals(files.length, count * 2 + 2);
+    Assert.assertTrue(expectedRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+    // Assert that recursive list files and nonrecursive list files with file info are as expected
+    List<FileMetadata> fileMetadata = localPinotFS.listFilesWithMetadata(tempDirPath.toURI(), false);
+    Assert.assertEquals(fileMetadata.size(), count + 2);
+    Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+    Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+    Assert.assertTrue(expectedNonRecursive
+            .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+        fileMetadata.toString());
+    fileMetadata = localPinotFS.listFilesWithMetadata(tempDirPath.toURI(), true);
+    Assert.assertEquals(fileMetadata.size(), count * 2 + 2);
+    Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+    Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), count + 1);
+    Assert.assertTrue(
+        expectedRecursive.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+        fileMetadata.toString());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org