You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/09/28 17:01:25 UTC

[pinot] branch master updated: extend PinotFS impls with listFilesWithMetadata and some bugfix (#9478)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 550cb0233a extend PinotFS impls with listFilesWithMetadata and some bugfix (#9478)
550cb0233a is described below

commit 550cb0233a9bbfa4a58b83e542f5e928bbcc2d43
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Sep 28 10:01:17 2022 -0700

    extend PinotFS impls with listFilesWithMetadata and some bugfix (#9478)
---
 .../pinot/plugin/filesystem/ADLSGen2PinotFS.java   |  54 +++++++----
 .../pinot/plugin/filesystem/AzurePinotFS.java      |  65 +++++++++-----
 .../filesystem/test/ADLSGen2PinotFSTest.java       |  48 ++++++++++
 .../plugin/filesystem/test/AzurePinotFSTest.java   | 100 +++++++++++++++++++++
 .../pinot/plugin/filesystem/HadoopPinotFS.java     |  51 +++++++----
 .../pinot/plugin/filesystem/HadoopPinotFSTest.java |  62 +++++++++++++
 .../apache/pinot/spi/filesystem/FileMetadata.java  |   4 +-
 .../apache/pinot/spi/filesystem/LocalPinotFS.java  |   6 +-
 8 files changed, 330 insertions(+), 60 deletions(-)

diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index 3a3a684b21..a09220f60d 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -56,14 +56,15 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.sql.Timestamp;
 import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -182,9 +183,9 @@ public class ADLSGen2PinotFS extends BasePinotFS {
         Preconditions.checkNotNull(proxyPassword, "Proxy Password cannot be null");
 
         NettyAsyncHttpClientBuilder builder = new NettyAsyncHttpClientBuilder();
-        builder.proxy(new ProxyOptions(ProxyOptions.Type.HTTP,
-            new InetSocketAddress(proxyHost, Integer.parseInt(proxyPort))).setCredentials(proxyUsername,
-            proxyPassword));
+        builder.proxy(
+            new ProxyOptions(ProxyOptions.Type.HTTP, new InetSocketAddress(proxyHost, Integer.parseInt(proxyPort)))
+                .setCredentials(proxyUsername, proxyPassword));
         ClientSecretCredentialBuilder clientSecretCredentialBuilder =
             new ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId);
         clientSecretCredentialBuilder.httpClient(builder.build());
@@ -245,8 +246,9 @@ public class ADLSGen2PinotFS extends BasePinotFS {
       // By default, create directory call will overwrite if the path already exists. Setting IfNoneMatch = "*" to
       // prevent overwrite. https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
       DataLakeRequestConditions requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
-      _fileSystemClient.createDirectoryWithResponse(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(uri), null,
-          null, null, null, requestConditions, null, null);
+      _fileSystemClient
+          .createDirectoryWithResponse(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(uri), null, null, null,
+              null, requestConditions, null, null);
       return true;
     } catch (DataLakeStorageException e) {
       // If the path already exists, doing nothing and return true
@@ -414,12 +416,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
       throws IOException {
     LOGGER.debug("listFiles is called with fileUri='{}', recursive='{}'", fileUri, recursive);
     try {
-      // Unlike other Azure SDK APIs that takes url encoded path, ListPathsOptions takes decoded url
-      // e.g) 'path/segment' instead of 'path%2Fsegment'
-      String pathForListPathsOptions =
-          Utility.urlDecode(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(fileUri));
-      ListPathsOptions options = new ListPathsOptions().setPath(pathForListPathsOptions).setRecursive(recursive);
-      PagedIterable<PathItem> iter = _fileSystemClient.listPaths(options, null);
+      PagedIterable<PathItem> iter = listPathItems(fileUri, recursive);
       return iter.stream().map(p -> AzurePinotFSUtil.convertAzureStylePathToUriStylePath(p.getName()))
           .toArray(String[]::new);
     } catch (DataLakeStorageException e) {
@@ -427,6 +424,34 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     }
   }
 
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+      throws IOException {
+    LOGGER.debug("listFilesWithMetadata is called with fileUri='{}', recursive='{}'", fileUri, recursive);
+    try {
+      PagedIterable<PathItem> iter = listPathItems(fileUri, recursive);
+      return iter.stream().map(ADLSGen2PinotFS::getFileMetadata).collect(Collectors.toList());
+    } catch (DataLakeStorageException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private PagedIterable<PathItem> listPathItems(URI fileUri, boolean recursive)
+      throws IOException {
+    // Unlike other Azure SDK APIs that takes url encoded path, ListPathsOptions takes decoded url
+    // e.g) 'path/segment' instead of 'path%2Fsegment'
+    String pathForListPathsOptions = Utility.urlDecode(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(fileUri));
+    ListPathsOptions options = new ListPathsOptions().setPath(pathForListPathsOptions).setRecursive(recursive);
+    return _fileSystemClient.listPaths(options, null);
+  }
+
+  private static FileMetadata getFileMetadata(PathItem file) {
+    String path = AzurePinotFSUtil.convertAzureStylePathToUriStylePath(file.getName());
+    return new FileMetadata.Builder().setFilePath(path)
+        .setLastModifiedTime(file.getLastModified().toInstant().toEpochMilli()).setLength(file.getContentLength())
+        .setIsDirectory(file.isDirectory()).build();
+  }
+
   /**
    * Copy a file from ADL to local location.
    *
@@ -520,8 +545,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     try {
       PathProperties pathProperties = getPathProperties(uri);
       OffsetDateTime offsetDateTime = pathProperties.getLastModified();
-      Timestamp timestamp = Timestamp.valueOf(offsetDateTime.atZoneSameInstant(ZoneOffset.UTC).toLocalDateTime());
-      return timestamp.getTime();
+      return offsetDateTime.toInstant().toEpochMilli();
     } catch (DataLakeStorageException e) {
       throw new IOException("Failed while checking lastModified time for : " + uri, e);
     }
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java
index c446f8b91e..1c6b70bf20 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.filesystem;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.microsoft.azure.datalake.store.ADLStoreClient;
 import com.microsoft.azure.datalake.store.DirectoryEntry;
 import com.microsoft.azure.datalake.store.DirectoryEntryType;
@@ -35,13 +36,15 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.function.Consumer;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,36 +161,54 @@ public class AzurePinotFS extends BasePinotFS {
     if (rootDir == null) {
       return EMPTY_ARR;
     }
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    visitFiles(fileUri, recursive, f -> builder.add(f.fullName));
+    String[] listedFiles = builder.build().toArray(new String[0]);
+    LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
+    return listedFiles;
+  }
 
-    if (!recursive) {
-      List<DirectoryEntry> shallowDirectoryEntries = _adlStoreClient.enumerateDirectory(rootDir.fullName);
-      List<String> shallowDirPaths = new ArrayList<>(shallowDirectoryEntries.size());
-      for (DirectoryEntry directoryEntry : shallowDirectoryEntries) {
-        shallowDirPaths.add(directoryEntry.fullName);
-      }
-      return shallowDirPaths.toArray(new String[shallowDirPaths.size()]);
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+      throws IOException {
+    DirectoryEntry rootDir = _adlStoreClient.getDirectoryEntry(fileUri.getPath());
+    if (rootDir == null) {
+      return Collections.emptyList();
     }
+    ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+    visitFiles(fileUri, recursive, f -> {
+      FileMetadata.Builder fileBuilder =
+          new FileMetadata.Builder().setFilePath(f.fullName).setLastModifiedTime(f.lastModifiedTime.getTime())
+              .setLength(f.length).setIsDirectory(f.type.equals(DirectoryEntryType.DIRECTORY));
+      listBuilder.add(fileBuilder.build());
+    });
+    ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+    LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive);
+    return listedFiles;
+  }
 
-    List<DirectoryEntry> directoryEntries = listFiles(rootDir);
-    List<String> fullFilePaths = new ArrayList<>(directoryEntries.size());
-    for (DirectoryEntry directoryEntry : directoryEntries) {
-      fullFilePaths.add(directoryEntry.fullName);
+  private void visitFiles(URI fileUri, boolean recursive, Consumer<DirectoryEntry> visitor)
+      throws IOException {
+    DirectoryEntry rootDir = _adlStoreClient.getDirectoryEntry(fileUri.getPath());
+    if (rootDir == null) {
+      throw new IllegalArgumentException("fileUri does not exist: " + fileUri);
+    }
+    if (!recursive) {
+      _adlStoreClient.enumerateDirectory(rootDir.fullName).forEach(visitor);
+    } else {
+      visitFilesRecursively(rootDir, visitor);
     }
-    return fullFilePaths.toArray(new String[fullFilePaths.size()]);
   }
 
-  private List<DirectoryEntry> listFiles(DirectoryEntry origDirEntry)
+  private void visitFilesRecursively(DirectoryEntry origDirEntry, Consumer<DirectoryEntry> visitor)
       throws IOException {
-    List<DirectoryEntry> fileList = new ArrayList<>();
-    if (origDirEntry.type.equals(DirectoryEntryType.DIRECTORY)) {
-      for (DirectoryEntry directoryEntry : _adlStoreClient.enumerateDirectory(origDirEntry.fullName)) {
-        fileList.add(directoryEntry);
-        fileList.addAll(listFiles(directoryEntry));
+    for (DirectoryEntry directoryEntry : _adlStoreClient.enumerateDirectory(origDirEntry.fullName)) {
+      System.out.println("directoryEntry:" + directoryEntry.fullName);
+      visitor.accept(directoryEntry);
+      if (directoryEntry.type.equals(DirectoryEntryType.DIRECTORY)) {
+        visitFilesRecursively(directoryEntry, visitor);
       }
-    } else {
-      fileList.add(origDirEntry);
     }
-    return fileList;
   }
 
   @Override
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
index 6a4ec9b9fd..976817dd64 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
@@ -36,10 +36,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -201,6 +206,49 @@ public class ADLSGen2PinotFSTest {
     verify(_mockPathItem).getName();
   }
 
+  @Test
+  public void testListFilesWithMetadata()
+      throws IOException {
+    when(_mockFileSystemClient.listPaths(any(), any())).thenReturn(_mockPagedIterable);
+    when(_mockPagedIterable.stream()).thenReturn(Collections.singletonList(_mockPathItem).stream());
+    when(_mockPathItem.getName()).thenReturn("foo");
+    when(_mockPathItem.isDirectory()).thenReturn(false);
+    when(_mockPathItem.getContentLength()).thenReturn(1024L);
+    OffsetDateTime mtime = OffsetDateTime.now();
+    when(_mockPathItem.getLastModified()).thenReturn(mtime);
+
+    List<FileMetadata> actual = _adlsGen2PinotFsUnderTest.listFilesWithMetadata(_mockURI, true);
+    FileMetadata fm = actual.get(0);
+    Assert.assertEquals(fm.getFilePath(), "/foo");
+    Assert.assertFalse(fm.isDirectory());
+    Assert.assertEquals(fm.getLength(), 1024);
+    Assert.assertEquals(fm.getLastModifiedTime(), mtime.toInstant().toEpochMilli());
+
+    verify(_mockFileSystemClient).listPaths(any(), any());
+    verify(_mockPagedIterable).stream();
+    verify(_mockPathItem).getName();
+    verify(_mockPathItem).isDirectory();
+    verify(_mockPathItem).getContentLength();
+    verify(_mockPathItem).getLastModified();
+  }
+
+  @Test
+  public void testLastModified()
+      throws IOException {
+    when(_mockFileSystemClient.getDirectoryClient(any())).thenReturn(_mockDirectoryClient);
+    when(_mockDirectoryClient.getProperties()).thenReturn(_mockPathProperties);
+    Instant now = Instant.now();
+    OffsetDateTime mtime = Instant.now().atOffset(ZoneOffset.UTC);
+    when(_mockPathProperties.getLastModified()).thenReturn(mtime);
+
+    long actual = _adlsGen2PinotFsUnderTest.lastModified(_mockURI);
+    Assert.assertEquals(actual, now.toEpochMilli());
+
+    verify(_mockFileSystemClient).getDirectoryClient(any());
+    verify(_mockDirectoryClient).getProperties();
+    verify(_mockPathProperties).getLastModified();
+  }
+
   @Test
   public void testListFilesException() {
     when(_mockFileSystemClient.listPaths(any(), any())).thenThrow(_mockDataLakeStorageException);
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java
index 1c3be6d4f5..61a12f6d42 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java
@@ -20,11 +20,22 @@ package org.apache.pinot.plugin.filesystem.test;
 
 import com.microsoft.azure.datalake.store.ADLFileInputStream;
 import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.DirectoryEntry;
+import com.microsoft.azure.datalake.store.DirectoryEntryType;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.plugin.filesystem.AzurePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -77,6 +88,95 @@ public class AzurePinotFSTest {
     Assert.assertTrue(file.exists());
   }
 
+  @Test
+  public void testListFilesWithMetadataNonRecursive()
+      throws IOException {
+    ADLStoreClient adlStoreClient = Mockito.mock(ADLStoreClient.class);
+    AzurePinotFS azurePinotFS = new AzurePinotFS(adlStoreClient);
+    URI baseURI = new File(_adlLocation, "testDirNonRecursive").toURI();
+
+    Date lastModifiedTime = new Date();
+    Date lastAccessTime = new Date();
+    DirectoryEntry testDir =
+        new DirectoryEntry("testDir", baseURI.toString(), 1024, null, null, lastAccessTime, lastModifiedTime,
+            DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+    Mockito.when(adlStoreClient.getDirectoryEntry(baseURI.getPath())).thenReturn(testDir);
+
+    List<DirectoryEntry> entries = new ArrayList<>();
+    DirectoryEntry dir =
+        new DirectoryEntry("dir1", testDir.fullName + "/dir1", 128, null, null, lastAccessTime, lastModifiedTime,
+            DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+    entries.add(dir);
+    DirectoryEntry file =
+        new DirectoryEntry("file1", testDir.fullName + "/file1", 1024, null, null, lastAccessTime, lastModifiedTime,
+            DirectoryEntryType.FILE, 128, 1, null, false, null);
+    entries.add(file);
+    Mockito.when(adlStoreClient.enumerateDirectory(Mockito.anyString())).thenReturn(entries);
+
+    String[] files = azurePinotFS.listFiles(baseURI, false);
+    Assert.assertEquals(files.length, 2);
+    Assert
+        .assertTrue(entries.stream().map(d -> d.fullName).collect(Collectors.toSet()).containsAll(Arrays.asList(files)),
+            Arrays.toString(files));
+
+    List<FileMetadata> fileMetadata = azurePinotFS.listFilesWithMetadata(baseURI, false);
+    Assert.assertEquals(fileMetadata.size(), 2);
+    Assert.assertTrue(entries.stream().map(d -> d.fullName).collect(Collectors.toSet())
+            .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+        fileMetadata.toString());
+    Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), 1);
+    Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+  }
+
+  @Test
+  public void testListFilesWithMetadataRecursive()
+      throws IOException {
+    ADLStoreClient adlStoreClient = Mockito.mock(ADLStoreClient.class);
+    AzurePinotFS azurePinotFS = new AzurePinotFS(adlStoreClient);
+    URI baseURI = new File(_adlLocation, "testDirRecursive").toURI();
+
+    Date lastModifiedTime = new Date();
+    Date lastAccessTime = new Date();
+    DirectoryEntry testDir =
+        new DirectoryEntry("testDir", baseURI.toString(), 128, null, null, lastAccessTime, lastModifiedTime,
+            DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+    Mockito.when(adlStoreClient.getDirectoryEntry(baseURI.getPath())).thenReturn(testDir);
+
+    Set<String> expected = new HashSet<>();
+    List<DirectoryEntry> dirEntries = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      DirectoryEntry dir = new DirectoryEntry("dir" + i, testDir.fullName + "/dir" + i, 128, null, null, lastAccessTime,
+          lastModifiedTime, DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+      dirEntries.add(dir);
+      expected.add(dir.fullName);
+    }
+    List<DirectoryEntry> fileEntries = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      DirectoryEntry file =
+          new DirectoryEntry("file" + i, testDir.fullName + "/dir2/file" + i, 1024, null, null, lastAccessTime,
+              lastModifiedTime, DirectoryEntryType.FILE, 128, 1, null, false, null);
+      fileEntries.add(file);
+      expected.add(file.fullName);
+    }
+
+    // Prepare the mock for calling listFiles() and listFilesWithMetadata().
+    Mockito.when(adlStoreClient.enumerateDirectory(Mockito.anyString()))
+        .thenReturn(dirEntries, Collections.emptyList(), fileEntries /* for listFiles() */, dirEntries,
+            Collections.emptyList(), fileEntries /* for listFilesWithMetadata() */);
+
+    String[] files = azurePinotFS.listFiles(baseURI, true);
+    Assert.assertEquals(files.length, 5);
+    Assert.assertTrue(expected.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+    List<FileMetadata> fileMetadata = azurePinotFS.listFilesWithMetadata(baseURI, true);
+    Assert.assertEquals(fileMetadata.size(), 5);
+    Assert.assertTrue(
+        expected.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+        fileMetadata.toString());
+    Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), 2);
+    Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 3);
+  }
+
   @AfterClass
   public void tearDown() {
     new File(_adlLocation).delete();
diff --git a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
index 5de0ceea33..f65ec1945f 100644
--- a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
@@ -20,13 +20,14 @@
 package org.apache.pinot.plugin.filesystem;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -36,6 +37,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,34 +142,47 @@ public class HadoopPinotFS extends BasePinotFS {
   @Override
   public String[] listFiles(URI fileUri, boolean recursive)
       throws IOException {
-    ArrayList<String> filePathStrings = new ArrayList<>();
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    visitFiles(fileUri, recursive, f -> builder.add(f.getPath().toString()));
+    String[] listedFiles = builder.build().toArray(new String[0]);
+    LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
+    return listedFiles;
+  }
+
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+      throws IOException {
+    ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+    visitFiles(fileUri, recursive, f -> {
+      FileMetadata.Builder fileBuilder =
+          new FileMetadata.Builder().setFilePath(f.getPath().toString()).setLastModifiedTime(f.getModificationTime())
+              .setLength(f.getLen()).setIsDirectory(f.isDirectory());
+      listBuilder.add(fileBuilder.build());
+    });
+    ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+    LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive);
+    return listedFiles;
+  }
+
+  private void visitFiles(URI fileUri, boolean recursive, Consumer<FileStatus> visitor)
+      throws IOException {
     Path path = new Path(fileUri);
-    if (_hadoopFS.exists(path)) {
-      // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
-      List<FileStatus> files = listStatus(path, recursive);
-      for (FileStatus file : files) {
-        filePathStrings.add(file.getPath().toString());
-      }
-    } else {
+    if (!_hadoopFS.exists(path)) {
       throw new IllegalArgumentException("fileUri does not exist: " + fileUri);
     }
-    String[] retArray = new String[filePathStrings.size()];
-    filePathStrings.toArray(retArray);
-    return retArray;
+    visitFileStatus(path, recursive, visitor);
   }
 
-  private List<FileStatus> listStatus(Path path, boolean recursive)
+  private void visitFileStatus(Path path, boolean recursive, Consumer<FileStatus> visitor)
       throws IOException {
-    List<FileStatus> fileStatuses = new ArrayList<>();
+    // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
     FileStatus[] files = _hadoopFS.listStatus(path);
     for (FileStatus file : files) {
-      fileStatuses.add(file);
+      visitor.accept(file);
       if (file.isDirectory() && recursive) {
-        List<FileStatus> subFiles = listStatus(file.getPath(), true);
-        fileStatuses.addAll(subFiles);
+        visitFileStatus(file.getPath(), true, visitor);
       }
     }
-    return fileStatuses;
   }
 
   @Override
diff --git a/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
index 7434aeeaf0..2ba85994b4 100644
--- a/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
@@ -21,8 +21,13 @@ package org.apache.pinot.plugin.filesystem;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -52,4 +57,61 @@ public class HadoopPinotFSTest {
       hadoopFS.delete(baseURI, true);
     }
   }
+
+  @Test
+  public void testListFilesWithMetadata()
+      throws IOException {
+    URI baseURI = URI.create(TMP_DIR + "/HadoopPinotFSTestListFiles");
+    try (HadoopPinotFS hadoopFS = new HadoopPinotFS()) {
+      hadoopFS.init(new PinotConfiguration());
+
+      // Create a testDir and file underneath directory
+      int count = 5;
+      List<String> expectedNonRecursive = new ArrayList<>();
+      List<String> expectedRecursive = new ArrayList<>();
+      for (int i = 0; i < count; i++) {
+        URI testDir = new Path(baseURI.getPath(), "testDir" + i).toUri();
+        hadoopFS.mkdir(testDir);
+        expectedNonRecursive.add((testDir.getScheme() == null ? "file:" : "") + testDir);
+
+        URI testFile = new Path(testDir.getPath(), "testFile" + i).toUri();
+        hadoopFS.touch(testFile);
+        expectedRecursive.add((testDir.getScheme() == null ? "file:" : "") + testDir);
+        expectedRecursive.add((testDir.getScheme() == null ? "file:" : "") + testFile);
+      }
+      URI testDirEmpty = new Path(baseURI.getPath(), "testDirEmpty").toUri();
+      hadoopFS.mkdir(testDirEmpty);
+      expectedNonRecursive.add((testDirEmpty.getScheme() == null ? "file:" : "") + testDirEmpty);
+      expectedRecursive.add((testDirEmpty.getScheme() == null ? "file:" : "") + testDirEmpty);
+
+      URI testRootFile = new Path(baseURI.getPath(), "testRootFile").toUri();
+      hadoopFS.touch(testRootFile);
+      expectedNonRecursive.add((testRootFile.getScheme() == null ? "file:" : "") + testRootFile);
+      expectedRecursive.add((testRootFile.getScheme() == null ? "file:" : "") + testRootFile);
+
+      // Assert that recursive list files and nonrecursive list files are as expected
+      String[] files = hadoopFS.listFiles(baseURI, false);
+      Assert.assertEquals(files.length, count + 2);
+      Assert.assertTrue(expectedNonRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+      files = hadoopFS.listFiles(baseURI, true);
+      Assert.assertEquals(files.length, count * 2 + 2);
+      Assert.assertTrue(expectedRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+      // Assert that recursive list files and nonrecursive list files with file info are as expected
+      List<FileMetadata> fileMetadata = hadoopFS.listFilesWithMetadata(baseURI, false);
+      Assert.assertEquals(fileMetadata.size(), count + 2);
+      Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+      Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+      Assert.assertTrue(expectedNonRecursive
+              .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+          fileMetadata.toString());
+      fileMetadata = hadoopFS.listFilesWithMetadata(baseURI, true);
+      Assert.assertEquals(fileMetadata.size(), count * 2 + 2);
+      Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+      Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), count + 1);
+      Assert.assertTrue(expectedRecursive
+              .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+          fileMetadata.toString());
+    }
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
index 4def90b6de..a5e90b0de4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
@@ -52,8 +52,8 @@ public class FileMetadata {
 
   @Override
   public String toString() {
-    return "FileInfo{" + "_filePath='" + _filePath + '\'' + ", _lastModifiedTime=" + _lastModifiedTime + ", _length="
-        + _length + ", _isDirectory=" + _isDirectory + '}';
+    return "FileMetadata{" + "_filePath='" + _filePath + '\'' + ", _lastModifiedTime=" + _lastModifiedTime
+        + ", _length=" + _length + ", _isDirectory=" + _isDirectory + '}';
   }
 
   public static class Builder {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
index ed6c8f2143..5eae4d9267 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
@@ -122,14 +122,14 @@ public class LocalPinotFS extends BasePinotFS {
       throws IOException {
     File file = toFile(fileUri);
     if (!recursive) {
-      return Arrays.stream(file.list()).map(s -> getFileInfo(new File(file, s))).collect(Collectors.toList());
+      return Arrays.stream(file.list()).map(s -> getFileMetadata(new File(file, s))).collect(Collectors.toList());
     } else {
-      return Files.walk(Paths.get(fileUri)).filter(s -> !s.equals(file.toPath())).map(p -> getFileInfo(p.toFile()))
+      return Files.walk(Paths.get(fileUri)).filter(s -> !s.equals(file.toPath())).map(p -> getFileMetadata(p.toFile()))
           .collect(Collectors.toList());
     }
   }
 
-  private static FileMetadata getFileInfo(File file) {
+  private static FileMetadata getFileMetadata(File file) {
     return new FileMetadata.Builder().setFilePath(file.getAbsolutePath()).setLastModifiedTime(file.lastModified())
         .setLength(file.length()).setIsDirectory(file.isDirectory()).build();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org