You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/27 21:42:02 UTC
[pinot] branch master updated: collect file info like mtime, length while listing files for free (#9466)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9f92392b90 collect file info like mtime, length while listing files for free (#9466)
9f92392b90 is described below
commit 9f92392b902fb9f25ce0af63bae9d925a4b9525b
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Sep 27 14:41:57 2022 -0700
collect file info like mtime, length while listing files for free (#9466)
---
.../apache/pinot/plugin/filesystem/GcsPinotFS.java | 61 ++++++++++++---
.../{TestGcsPinotFS.java => GcsPinotFSTest.java} | 69 ++++++++++++++++-
.../{TestGcsUri.java => GcsUriTest.java} | 2 +-
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 58 ++++++++++----
.../pinot/plugin/filesystem/S3PinotFSTest.java | 60 ++++++++++++++-
.../apache/pinot/spi/filesystem/FileMetadata.java | 89 ++++++++++++++++++++++
.../apache/pinot/spi/filesystem/LocalPinotFS.java | 19 +++++
.../org/apache/pinot/spi/filesystem/PinotFS.java | 14 ++++
.../pinot/spi/filesystem/LocalPinotFSTest.java | 60 +++++++++++++++
9 files changed, 399 insertions(+), 33 deletions(-)
diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
index dc5a1787e2..1092dd9505 100644
--- a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
@@ -43,9 +43,11 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,7 +161,50 @@ public class GcsPinotFS extends BasePinotFS {
@Override
public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
- return listFiles(new GcsUri(fileUri), recursive);
+ return listFilesFromGcsUri(new GcsUri(fileUri), recursive);
+ }
+
+ private String[] listFilesFromGcsUri(GcsUri gcsFileUri, boolean recursive)
+ throws IOException {
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ String prefix = gcsFileUri.getPrefix();
+ String bucketName = gcsFileUri.getBucketName();
+ visitFiles(gcsFileUri, recursive, blob -> {
+ if (!blob.getName().equals(prefix)) {
+ builder.add(GcsUri.createGcsUri(bucketName, blob.getName()).toString());
+ }
+ });
+ String[] listedFiles = builder.build().toArray(new String[0]);
+ LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, gcsFileUri, recursive);
+ return listedFiles;
+ }
+
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+ throws IOException {
+ ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+ GcsUri gcsFileUri = new GcsUri(fileUri);
+ String prefix = gcsFileUri.getPrefix();
+ String bucketName = gcsFileUri.getBucketName();
+ visitFiles(gcsFileUri, recursive, blob -> {
+ if (!blob.getName().equals(prefix)) {
+ // Note: isDirectory flag is only set when listing with BlobListOption.currentDirectory() i.e non-recursively.
+ // For simplicity, we check if a path is directory by checking if it ends with '/', as done in S3PinotFS.
+ boolean isDirectory = blob.getName().endsWith(GcsUri.DELIMITER);
+ FileMetadata.Builder fileBuilder =
+ new FileMetadata.Builder().setFilePath(GcsUri.createGcsUri(bucketName, blob.getName()).toString())
+ .setLength(blob.getSize()).setIsDirectory(isDirectory);
+ if (!isDirectory) {
+ // Note: if it's a directory, updateTime is set to null, and calling this getter leads to NPE.
+ // public Long getUpdateTime() { return updateTime; }. So skip this for directory.
+ fileBuilder.setLastModifiedTime(blob.getUpdateTime());
+ }
+ listBuilder.add(fileBuilder.build());
+ }
+ });
+ ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+ LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), gcsFileUri, recursive);
+ return listedFiles;
}
@Override
@@ -299,10 +344,9 @@ public class GcsPinotFS extends BasePinotFS {
return isEmpty;
}
- private String[] listFiles(GcsUri fileUri, boolean recursive)
+ private void visitFiles(GcsUri fileUri, boolean recursive, Consumer<Blob> visitor)
throws IOException {
try {
- ImmutableList.Builder<String> builder = ImmutableList.builder();
String prefix = fileUri.getPrefix();
Page<Blob> page;
if (recursive) {
@@ -311,14 +355,7 @@ public class GcsPinotFS extends BasePinotFS {
page = _storage.list(fileUri.getBucketName(), Storage.BlobListOption.prefix(prefix),
Storage.BlobListOption.currentDirectory());
}
- page.iterateAll().forEach(blob -> {
- if (!blob.getName().equals(prefix)) {
- builder.add(GcsUri.createGcsUri(fileUri.getBucketName(), blob.getName()).toString());
- }
- });
- String[] listedFiles = builder.build().toArray(new String[0]);
- LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
- return listedFiles;
+ page.iterateAll().forEach(visitor);
} catch (Exception t) {
throw new IOException(t);
}
@@ -423,7 +460,7 @@ public class GcsPinotFS extends BasePinotFS {
mkdir(dstUri.getUri());
}
boolean copySucceeded = true;
- for (String directoryEntry : listFiles(srcUri, true)) {
+ for (String directoryEntry : listFilesFromGcsUri(srcUri, true)) {
GcsUri srcFile = new GcsUri(URI.create(directoryEntry));
String relativeSrcPath = srcUri.relativize(srcFile);
GcsUri dstFile = dstUri.resolve(relativeSrcPath);
diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsPinotFS.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java
similarity index 74%
rename from pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsPinotFS.java
rename to pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java
index 17bbb38b26..bcde7b1556 100644
--- a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsPinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSTest.java
@@ -32,9 +32,12 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -69,7 +72,7 @@ import static org.testng.Assert.assertTrue;
* If credentials are not supplied then all tests are skipped.
*/
@Test(singleThreaded = true)
-public class TestGcsPinotFS {
+public class GcsPinotFSTest {
private static final String DATA_DIR_PREFIX = "testing-data";
private GcsPinotFS _pinotFS;
@@ -223,8 +226,8 @@ public class TestGcsPinotFS {
String directoryName = Paths.get(gcsDirectoryUri.getPath()).getFileName().toString();
String directoryCopyName = Paths.get(gcsDirectoryUriCopy.getPath()).getFileName().toString();
for (GcsUri element : ImmutableList.copyOf(expectedElements)) {
- expectedElementsCopy.add(
- createGcsUri(element.getBucketName(), element.getPath().replace(directoryName, directoryCopyName)));
+ expectedElementsCopy
+ .add(createGcsUri(element.getBucketName(), element.getPath().replace(directoryName, directoryCopyName)));
}
expectedElementsCopy.addAll(expectedElements);
assertEquals(listFilesToStream(_dataDir).collect(toSet()), expectedElementsCopy);
@@ -243,4 +246,64 @@ public class TestGcsPinotFS {
_pinotFS.move(movedFileGcsUri.getUri(), nonEmptyFileGcsUri.getUri(), false);
assertTrue(listFilesToStream(gcsDirectoryUri).anyMatch(uri -> uri.equals(nonEmptyFileGcsUri)));
}
+
+ @Test
+ public void testListFilesWithMetadata()
+ throws Exception {
+ skipIfNotConfigured();
+
+ // Create empty file
+ Path localTmpDir = createLocalTempDirectory();
+ Path emptyFile = localTmpDir.resolve("empty");
+ emptyFile.toFile().createNewFile();
+
+ // Create 5 subfolders with files inside.
+ int count = 5;
+ Set<String> expectedNonRecursive = new HashSet<>();
+ Set<String> expectedRecursive = new HashSet<>();
+ for (int i = 0; i < count; i++) {
+ GcsUri testDir = _dataDir.resolve("testDir" + i);
+ _pinotFS.mkdir(testDir.getUri());
+ expectedNonRecursive.add(appendSlash(testDir).toString());
+
+ GcsUri testFile = testDir.resolve("testFile" + i);
+ // Create the file by copying an empty file there.
+ _pinotFS.copyFromLocalFile(emptyFile.toFile(), testFile.getUri());
+ expectedRecursive.add(appendSlash(testDir).toString());
+ expectedRecursive.add(testFile.toString());
+ }
+ GcsUri testDirEmpty = _dataDir.resolve("testDirEmpty");
+ _pinotFS.mkdir(testDirEmpty.getUri());
+ expectedNonRecursive.add(appendSlash(testDirEmpty).toString());
+ expectedRecursive.add(appendSlash(testDirEmpty).toString());
+
+ GcsUri testRootFile = _dataDir.resolve("testRootFile");
+ _pinotFS.copyFromLocalFile(emptyFile.toFile(), testRootFile.getUri());
+ expectedNonRecursive.add(testRootFile.toString());
+ expectedRecursive.add(testRootFile.toString());
+
+ // Assert that recursive list files and nonrecursive list files are as expected
+ String[] files = _pinotFS.listFiles(_dataDir.getUri(), false);
+ Assert.assertEquals(files.length, count + 2);
+ Assert.assertTrue(expectedNonRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+ files = _pinotFS.listFiles(_dataDir.getUri(), true);
+ Assert.assertEquals(files.length, count * 2 + 2);
+ Assert.assertTrue(expectedRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+ // Assert that recursive list files and nonrecursive list files with file info are as expected
+ List<FileMetadata> fileMetadata = _pinotFS.listFilesWithMetadata(_dataDir.getUri(), false);
+ Assert.assertEquals(fileMetadata.size(), count + 2);
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+ Assert.assertTrue(expectedNonRecursive
+ .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ fileMetadata = _pinotFS.listFilesWithMetadata(_dataDir.getUri(), true);
+ Assert.assertEquals(fileMetadata.size(), count * 2 + 2);
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), count + 1);
+ Assert.assertTrue(
+ expectedRecursive.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ }
}
diff --git a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsUri.java b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsUriTest.java
similarity index 99%
rename from pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsUri.java
rename to pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsUriTest.java
index 355a32ab46..73b2bf5982 100644
--- a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsUri.java
+++ b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsUriTest.java
@@ -27,7 +27,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
-public class TestGcsUri {
+public class GcsUriTest {
@Test
public void testCreateGcsUriUsingADifferentScheme() {
URI uri = URI.create("file://bucket/file");
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index f9cccb7d12..e2e366dfe3 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -34,9 +34,11 @@ import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -435,8 +437,48 @@ public class S3PinotFS extends BasePinotFS {
@Override
public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ visitFiles(fileUri, recursive, s3Object -> {
+ // TODO: Looks like S3PinotFS filters out directories, inconsistent with the other implementations.
+ // Only add files and not directories
+ if (!s3Object.key().equals(fileUri.getPath()) && !s3Object.key().endsWith(DELIMITER)) {
+ builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object));
+ }
+ });
+ String[] listedFiles = builder.build().toArray(new String[0]);
+ LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
+ return listedFiles;
+ }
+
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+ throws IOException {
+ ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+ visitFiles(fileUri, recursive, s3Object -> {
+ if (!s3Object.key().equals(fileUri.getPath())) {
+ FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
+ .setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
+ .setLastModifiedTime(s3Object.lastModified().toEpochMilli()).setLength(s3Object.size())
+ .setIsDirectory(s3Object.key().endsWith(DELIMITER));
+ listBuilder.add(fileBuilder.build());
+ }
+ });
+ ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+ LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive);
+ return listedFiles;
+ }
+
+ private static String getNormalizedFileKey(S3Object s3Object) {
+ String fileKey = s3Object.key();
+ if (fileKey.startsWith(DELIMITER)) {
+ fileKey = fileKey.substring(1);
+ }
+ return fileKey;
+ }
+
+ private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object> visitor)
+ throws IOException {
try {
- ImmutableList.Builder<String> builder = ImmutableList.builder();
String continuationToken = null;
boolean isDone = false;
String prefix = normalizeToDirectoryPrefix(fileUri);
@@ -457,22 +499,10 @@ public class S3PinotFS extends BasePinotFS {
ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
LOGGER.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
- filesReturned.stream().forEach(object -> {
- //Only add files and not directories
- if (!object.key().equals(fileUri.getPath()) && !object.key().endsWith(DELIMITER)) {
- String fileKey = object.key();
- if (fileKey.startsWith(DELIMITER)) {
- fileKey = fileKey.substring(1);
- }
- builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
- }
- });
+ filesReturned.forEach(visitor);
isDone = !listObjectsV2Response.isTruncated();
continuationToken = listObjectsV2Response.nextContinuationToken();
}
- String[] listedFiles = builder.build().toArray(new String[0]);
- LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
- return listedFiles;
} catch (Throwable t) {
throw new IOException(t);
}
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index 71f3794d61..b877c29e91 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -27,7 +27,9 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -143,8 +145,12 @@ public class S3PinotFSTest {
for (String fileName : originalFiles) {
createEmptyFile(folder, fileName);
}
+ // Files in sub folders should be skipped.
+ createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
+ createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false);
+ Assert.assertEquals(actualFiles.length, originalFiles.length);
actualFiles = Arrays.stream(actualFiles).filter(x -> x.contains("list-2")).toArray(String[]::new);
Assert.assertEquals(actualFiles.length, originalFiles.length);
@@ -158,24 +164,72 @@ public class S3PinotFSTest {
public void testListFilesInFolderRecursive()
throws Exception {
String folder = "list-files-rec";
- String[] nestedFolders = new String[]{"list-files-child-1", "list-files-child-2"};
+ String[] nestedFolders = new String[]{"", "list-files-child-1", "list-files-child-2"};
String[] originalFiles = new String[]{"a-list-3.txt", "b-list-3.txt", "c-list-3.txt"};
List<String> expectedResultList = new ArrayList<>();
for (String childFolder : nestedFolders) {
- String folderName = folder + DELIMITER + childFolder;
+ String folderName = folder + (childFolder.length() == 0 ? "" : DELIMITER + childFolder);
for (String fileName : originalFiles) {
createEmptyFile(folderName, fileName);
expectedResultList.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName + DELIMITER + fileName));
}
}
String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), true);
-
+ Assert.assertEquals(actualFiles.length, expectedResultList.size());
actualFiles = Arrays.stream(actualFiles).filter(x -> x.contains("list-3")).toArray(String[]::new);
Assert.assertEquals(actualFiles.length, expectedResultList.size());
Assert.assertTrue(Arrays.equals(expectedResultList.toArray(), actualFiles));
}
+ @Test
+ public void testListFilesWithMetadataInFolderNonRecursive()
+ throws Exception {
+ String folder = "list-files-with-md";
+ String[] originalFiles = new String[]{"a-list-2.txt", "b-list-2.txt", "c-list-2.txt"};
+ for (String fileName : originalFiles) {
+ createEmptyFile(folder, fileName);
+ }
+ // Files in sub folders should be skipped.
+ createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
+ createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
+ List<FileMetadata> actualFiles =
+ _s3PinotFS.listFilesWithMetadata(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false);
+ Assert.assertEquals(actualFiles.size(), originalFiles.length);
+ List<String> actualFilePaths =
+ actualFiles.stream().map(FileMetadata::getFilePath).filter(fp -> fp.contains("list-2"))
+ .collect(Collectors.toList());
+ Assert.assertEquals(actualFilePaths.size(), originalFiles.length);
+ Assert.assertEquals(Arrays.stream(originalFiles)
+ .map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + fileName))
+ .collect(Collectors.toList()), actualFilePaths);
+ }
+
+ @Test
+ public void testListFilesWithMetadataInFolderRecursive()
+ throws Exception {
+ String folder = "list-files-rec-with-md";
+ String[] nestedFolders = new String[]{"", "list-files-child-1", "list-files-child-2"};
+ String[] originalFiles = new String[]{"a-list-3.txt", "b-list-3.txt", "c-list-3.txt"};
+
+ List<String> expectedResultList = new ArrayList<>();
+ for (String childFolder : nestedFolders) {
+ String folderName = folder + (childFolder.length() == 0 ? "" : DELIMITER + childFolder);
+ for (String fileName : originalFiles) {
+ createEmptyFile(folderName, fileName);
+ expectedResultList.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName + DELIMITER + fileName));
+ }
+ }
+ List<FileMetadata> actualFiles =
+ _s3PinotFS.listFilesWithMetadata(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), true);
+ Assert.assertEquals(actualFiles.size(), expectedResultList.size());
+ List<String> actualFilePaths =
+ actualFiles.stream().map(FileMetadata::getFilePath).filter(fp -> fp.contains("list-3"))
+ .collect(Collectors.toList());
+ Assert.assertEquals(actualFilePaths.size(), expectedResultList.size());
+ Assert.assertEquals(expectedResultList, actualFilePaths);
+ }
+
@Test
public void testDeleteFile()
throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
new file mode 100644
index 0000000000..4def90b6de
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.filesystem;
+
+/**
+ * FileMetadata contains the file path and many optional file attributes like mtime, length etc.
+ */
+public class FileMetadata {
+ private final String _filePath;
+ private final long _lastModifiedTime;
+ private final long _length;
+ private final boolean _isDirectory;
+
+ private FileMetadata(String filePath, long lastModifiedTime, long length, boolean isDirectory) {
+ _filePath = filePath;
+ _lastModifiedTime = lastModifiedTime;
+ _length = length;
+ _isDirectory = isDirectory;
+ }
+
+ public String getFilePath() {
+ return _filePath;
+ }
+
+ public long getLastModifiedTime() {
+ return _lastModifiedTime;
+ }
+
+ public long getLength() {
+ return _length;
+ }
+
+ public boolean isDirectory() {
+ return _isDirectory;
+ }
+
+ @Override
+ public String toString() {
+ return "FileInfo{" + "_filePath='" + _filePath + '\'' + ", _lastModifiedTime=" + _lastModifiedTime + ", _length="
+ + _length + ", _isDirectory=" + _isDirectory + '}';
+ }
+
+ public static class Builder {
+ private String _filePath;
+ private long _lastModifiedTime;
+ private long _length;
+ private boolean _isDirectory;
+
+ public Builder setFilePath(String filePath) {
+ _filePath = filePath;
+ return this;
+ }
+
+ public Builder setLastModifiedTime(long lastModifiedTime) {
+ _lastModifiedTime = lastModifiedTime;
+ return this;
+ }
+
+ public Builder setLength(long length) {
+ _length = length;
+ return this;
+ }
+
+ public Builder setIsDirectory(boolean isDirectory) {
+ _isDirectory = isDirectory;
+ return this;
+ }
+
+ public FileMetadata build() {
+ return new FileMetadata(_filePath, _lastModifiedTime, _length, _isDirectory);
+ }
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
index 0dcbd00818..ed6c8f2143 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
@@ -30,6 +30,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -115,6 +117,23 @@ public class LocalPinotFS extends BasePinotFS {
}
}
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+ throws IOException {
+ File file = toFile(fileUri);
+ if (!recursive) {
+ return Arrays.stream(file.list()).map(s -> getFileInfo(new File(file, s))).collect(Collectors.toList());
+ } else {
+ return Files.walk(Paths.get(fileUri)).filter(s -> !s.equals(file.toPath())).map(p -> getFileInfo(p.toFile()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ private static FileMetadata getFileInfo(File file) {
+ return new FileMetadata.Builder().setFilePath(file.getAbsolutePath()).setLastModifiedTime(file.lastModified())
+ .setLength(file.length()).setIsDirectory(file.isDirectory()).build();
+ }
+
@Override
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
index 3ad2243a30..286a02c7d4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
+import java.util.List;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -156,6 +157,19 @@ public interface PinotFS extends Closeable, Serializable {
String[] listFiles(URI fileUri, boolean recursive)
throws IOException;
+ /**
+ * Like listFiles but also return some file metadata so no need to call length(), isDirectory(), and
+ * lastModified() separately, which can be slow and costly for remote file system.
+ * @param fileUri location of file
+ * @param recursive if we want to list files recursively
+ * @return a list of FileMetadata that contains file path, and a few file metadata.
+ * @throws IOException on IO failure. See specific implementation
+ */
+ default List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Copies a file from a remote filesystem to the local one. Keeps the original file.
* @param srcUri location of current file on remote filesystem (must not be a directory)
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
index f7bff51d43..7b3338bd11 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
@@ -21,6 +21,10 @@ package org.apache.pinot.spi.filesystem;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -250,4 +254,60 @@ public class LocalPinotFSTest {
localPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
}
+
+ @Test
+ public void testListFilesWithMetadata()
+ throws IOException {
+ LocalPinotFS localPinotFS = new LocalPinotFS();
+ File tempDirPath = new File(_absoluteTmpDirPath, "test-list-files-with-md");
+ Assert.assertTrue(tempDirPath.mkdir());
+
+ // Create a testDir and file underneath directory
+ int count = 5;
+ List<String> expectedNonRecursive = new ArrayList<>();
+ List<String> expectedRecursive = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ File testDir = new File(tempDirPath, "testDir" + i);
+ Assert.assertTrue(testDir.mkdir());
+ expectedNonRecursive.add(testDir.getAbsolutePath());
+
+ File testFile = new File(testDir, "testFile" + i);
+ Assert.assertTrue(testFile.createNewFile());
+ expectedRecursive.add(testDir.getAbsolutePath());
+ expectedRecursive.add(testFile.getAbsolutePath());
+ }
+ File testDirEmpty = new File(tempDirPath, "testDirEmpty");
+ Assert.assertTrue(testDirEmpty.mkdir());
+ expectedNonRecursive.add(testDirEmpty.getAbsolutePath());
+ expectedRecursive.add(testDirEmpty.getAbsolutePath());
+
+ File testRootFile = new File(tempDirPath, "testRootFile");
+ Assert.assertTrue(testRootFile.createNewFile());
+ expectedNonRecursive.add(testRootFile.getAbsolutePath());
+ expectedRecursive.add(testRootFile.getAbsolutePath());
+
+ // Assert that recursive list files and nonrecursive list files are as expected
+ String[] files = localPinotFS.listFiles(tempDirPath.toURI(), false);
+ Assert.assertEquals(files.length, count + 2);
+ Assert.assertTrue(expectedNonRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+ files = localPinotFS.listFiles(tempDirPath.toURI(), true);
+ Assert.assertEquals(files.length, count * 2 + 2);
+ Assert.assertTrue(expectedRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+ // Assert that recursive list files and nonrecursive list files with file info are as expected
+ List<FileMetadata> fileMetadata = localPinotFS.listFilesWithMetadata(tempDirPath.toURI(), false);
+ Assert.assertEquals(fileMetadata.size(), count + 2);
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+ Assert.assertTrue(expectedNonRecursive
+ .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ fileMetadata = localPinotFS.listFilesWithMetadata(tempDirPath.toURI(), true);
+ Assert.assertEquals(fileMetadata.size(), count * 2 + 2);
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), count + 1);
+ Assert.assertTrue(
+ expectedRecursive.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org