You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/09/28 17:01:25 UTC
[pinot] branch master updated: extend PinotFS impls with listFilesWithMetadata and some bugfix (#9478)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 550cb0233a extend PinotFS impls with listFilesWithMetadata and some bugfix (#9478)
550cb0233a is described below
commit 550cb0233a9bbfa4a58b83e542f5e928bbcc2d43
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Sep 28 10:01:17 2022 -0700
extend PinotFS impls with listFilesWithMetadata and some bugfix (#9478)
---
.../pinot/plugin/filesystem/ADLSGen2PinotFS.java | 54 +++++++----
.../pinot/plugin/filesystem/AzurePinotFS.java | 65 +++++++++-----
.../filesystem/test/ADLSGen2PinotFSTest.java | 48 ++++++++++
.../plugin/filesystem/test/AzurePinotFSTest.java | 100 +++++++++++++++++++++
.../pinot/plugin/filesystem/HadoopPinotFS.java | 51 +++++++----
.../pinot/plugin/filesystem/HadoopPinotFSTest.java | 62 +++++++++++++
.../apache/pinot/spi/filesystem/FileMetadata.java | 4 +-
.../apache/pinot/spi/filesystem/LocalPinotFS.java | 6 +-
8 files changed, 330 insertions(+), 60 deletions(-)
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index 3a3a684b21..a09220f60d 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -56,14 +56,15 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.sql.Timestamp;
import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -182,9 +183,9 @@ public class ADLSGen2PinotFS extends BasePinotFS {
Preconditions.checkNotNull(proxyPassword, "Proxy Password cannot be null");
NettyAsyncHttpClientBuilder builder = new NettyAsyncHttpClientBuilder();
- builder.proxy(new ProxyOptions(ProxyOptions.Type.HTTP,
- new InetSocketAddress(proxyHost, Integer.parseInt(proxyPort))).setCredentials(proxyUsername,
- proxyPassword));
+ builder.proxy(
+ new ProxyOptions(ProxyOptions.Type.HTTP, new InetSocketAddress(proxyHost, Integer.parseInt(proxyPort)))
+ .setCredentials(proxyUsername, proxyPassword));
ClientSecretCredentialBuilder clientSecretCredentialBuilder =
new ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId);
clientSecretCredentialBuilder.httpClient(builder.build());
@@ -245,8 +246,9 @@ public class ADLSGen2PinotFS extends BasePinotFS {
// By default, create directory call will overwrite if the path already exists. Setting IfNoneMatch = "*" to
// prevent overwrite. https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
DataLakeRequestConditions requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
- _fileSystemClient.createDirectoryWithResponse(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(uri), null,
- null, null, null, requestConditions, null, null);
+ _fileSystemClient
+ .createDirectoryWithResponse(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(uri), null, null, null,
+ null, requestConditions, null, null);
return true;
} catch (DataLakeStorageException e) {
// If the path already exists, doing nothing and return true
@@ -414,12 +416,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
throws IOException {
LOGGER.debug("listFiles is called with fileUri='{}', recursive='{}'", fileUri, recursive);
try {
- // Unlike other Azure SDK APIs that takes url encoded path, ListPathsOptions takes decoded url
- // e.g) 'path/segment' instead of 'path%2Fsegment'
- String pathForListPathsOptions =
- Utility.urlDecode(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(fileUri));
- ListPathsOptions options = new ListPathsOptions().setPath(pathForListPathsOptions).setRecursive(recursive);
- PagedIterable<PathItem> iter = _fileSystemClient.listPaths(options, null);
+ PagedIterable<PathItem> iter = listPathItems(fileUri, recursive);
return iter.stream().map(p -> AzurePinotFSUtil.convertAzureStylePathToUriStylePath(p.getName()))
.toArray(String[]::new);
} catch (DataLakeStorageException e) {
@@ -427,6 +424,34 @@ public class ADLSGen2PinotFS extends BasePinotFS {
}
}
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+ throws IOException {
+ LOGGER.debug("listFilesWithMetadata is called with fileUri='{}', recursive='{}'", fileUri, recursive);
+ try {
+ PagedIterable<PathItem> iter = listPathItems(fileUri, recursive);
+ return iter.stream().map(ADLSGen2PinotFS::getFileMetadata).collect(Collectors.toList());
+ } catch (DataLakeStorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private PagedIterable<PathItem> listPathItems(URI fileUri, boolean recursive)
+ throws IOException {
+ // Unlike other Azure SDK APIs that takes url encoded path, ListPathsOptions takes decoded url
+ // e.g) 'path/segment' instead of 'path%2Fsegment'
+ String pathForListPathsOptions = Utility.urlDecode(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(fileUri));
+ ListPathsOptions options = new ListPathsOptions().setPath(pathForListPathsOptions).setRecursive(recursive);
+ return _fileSystemClient.listPaths(options, null);
+ }
+
+ private static FileMetadata getFileMetadata(PathItem file) {
+ String path = AzurePinotFSUtil.convertAzureStylePathToUriStylePath(file.getName());
+ return new FileMetadata.Builder().setFilePath(path)
+ .setLastModifiedTime(file.getLastModified().toInstant().toEpochMilli()).setLength(file.getContentLength())
+ .setIsDirectory(file.isDirectory()).build();
+ }
+
/**
* Copy a file from ADL to local location.
*
@@ -520,8 +545,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
try {
PathProperties pathProperties = getPathProperties(uri);
OffsetDateTime offsetDateTime = pathProperties.getLastModified();
- Timestamp timestamp = Timestamp.valueOf(offsetDateTime.atZoneSameInstant(ZoneOffset.UTC).toLocalDateTime());
- return timestamp.getTime();
+ return offsetDateTime.toInstant().toEpochMilli();
} catch (DataLakeStorageException e) {
throw new IOException("Failed while checking lastModified time for : " + uri, e);
}
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java
index c446f8b91e..1c6b70bf20 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/AzurePinotFS.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.filesystem;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.DirectoryEntry;
import com.microsoft.azure.datalake.store.DirectoryEntryType;
@@ -35,13 +36,15 @@ import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,36 +161,54 @@ public class AzurePinotFS extends BasePinotFS {
if (rootDir == null) {
return EMPTY_ARR;
}
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ visitFiles(fileUri, recursive, f -> builder.add(f.fullName));
+ String[] listedFiles = builder.build().toArray(new String[0]);
+ LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
+ return listedFiles;
+ }
- if (!recursive) {
- List<DirectoryEntry> shallowDirectoryEntries = _adlStoreClient.enumerateDirectory(rootDir.fullName);
- List<String> shallowDirPaths = new ArrayList<>(shallowDirectoryEntries.size());
- for (DirectoryEntry directoryEntry : shallowDirectoryEntries) {
- shallowDirPaths.add(directoryEntry.fullName);
- }
- return shallowDirPaths.toArray(new String[shallowDirPaths.size()]);
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+ throws IOException {
+ DirectoryEntry rootDir = _adlStoreClient.getDirectoryEntry(fileUri.getPath());
+ if (rootDir == null) {
+ return Collections.emptyList();
}
+ ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+ visitFiles(fileUri, recursive, f -> {
+ FileMetadata.Builder fileBuilder =
+ new FileMetadata.Builder().setFilePath(f.fullName).setLastModifiedTime(f.lastModifiedTime.getTime())
+ .setLength(f.length).setIsDirectory(f.type.equals(DirectoryEntryType.DIRECTORY));
+ listBuilder.add(fileBuilder.build());
+ });
+ ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+ LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive);
+ return listedFiles;
+ }
- List<DirectoryEntry> directoryEntries = listFiles(rootDir);
- List<String> fullFilePaths = new ArrayList<>(directoryEntries.size());
- for (DirectoryEntry directoryEntry : directoryEntries) {
- fullFilePaths.add(directoryEntry.fullName);
+ private void visitFiles(URI fileUri, boolean recursive, Consumer<DirectoryEntry> visitor)
+ throws IOException {
+ DirectoryEntry rootDir = _adlStoreClient.getDirectoryEntry(fileUri.getPath());
+ if (rootDir == null) {
+ throw new IllegalArgumentException("fileUri does not exist: " + fileUri);
+ }
+ if (!recursive) {
+ _adlStoreClient.enumerateDirectory(rootDir.fullName).forEach(visitor);
+ } else {
+ visitFilesRecursively(rootDir, visitor);
}
- return fullFilePaths.toArray(new String[fullFilePaths.size()]);
}
- private List<DirectoryEntry> listFiles(DirectoryEntry origDirEntry)
+ private void visitFilesRecursively(DirectoryEntry origDirEntry, Consumer<DirectoryEntry> visitor)
throws IOException {
- List<DirectoryEntry> fileList = new ArrayList<>();
- if (origDirEntry.type.equals(DirectoryEntryType.DIRECTORY)) {
- for (DirectoryEntry directoryEntry : _adlStoreClient.enumerateDirectory(origDirEntry.fullName)) {
- fileList.add(directoryEntry);
- fileList.addAll(listFiles(directoryEntry));
+ for (DirectoryEntry directoryEntry : _adlStoreClient.enumerateDirectory(origDirEntry.fullName)) {
+ System.out.println("directoryEntry:" + directoryEntry.fullName);
+ visitor.accept(directoryEntry);
+ if (directoryEntry.type.equals(DirectoryEntryType.DIRECTORY)) {
+ visitFilesRecursively(directoryEntry, visitor);
}
- } else {
- fileList.add(origDirEntry);
}
- return fileList;
}
@Override
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
index 6a4ec9b9fd..976817dd64 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
@@ -36,10 +36,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -201,6 +206,49 @@ public class ADLSGen2PinotFSTest {
verify(_mockPathItem).getName();
}
+ @Test
+ public void testListFilesWithMetadata()
+ throws IOException {
+ when(_mockFileSystemClient.listPaths(any(), any())).thenReturn(_mockPagedIterable);
+ when(_mockPagedIterable.stream()).thenReturn(Collections.singletonList(_mockPathItem).stream());
+ when(_mockPathItem.getName()).thenReturn("foo");
+ when(_mockPathItem.isDirectory()).thenReturn(false);
+ when(_mockPathItem.getContentLength()).thenReturn(1024L);
+ OffsetDateTime mtime = OffsetDateTime.now();
+ when(_mockPathItem.getLastModified()).thenReturn(mtime);
+
+ List<FileMetadata> actual = _adlsGen2PinotFsUnderTest.listFilesWithMetadata(_mockURI, true);
+ FileMetadata fm = actual.get(0);
+ Assert.assertEquals(fm.getFilePath(), "/foo");
+ Assert.assertFalse(fm.isDirectory());
+ Assert.assertEquals(fm.getLength(), 1024);
+ Assert.assertEquals(fm.getLastModifiedTime(), mtime.toInstant().toEpochMilli());
+
+ verify(_mockFileSystemClient).listPaths(any(), any());
+ verify(_mockPagedIterable).stream();
+ verify(_mockPathItem).getName();
+ verify(_mockPathItem).isDirectory();
+ verify(_mockPathItem).getContentLength();
+ verify(_mockPathItem).getLastModified();
+ }
+
+ @Test
+ public void testLastModified()
+ throws IOException {
+ when(_mockFileSystemClient.getDirectoryClient(any())).thenReturn(_mockDirectoryClient);
+ when(_mockDirectoryClient.getProperties()).thenReturn(_mockPathProperties);
+ Instant now = Instant.now();
+ OffsetDateTime mtime = Instant.now().atOffset(ZoneOffset.UTC);
+ when(_mockPathProperties.getLastModified()).thenReturn(mtime);
+
+ long actual = _adlsGen2PinotFsUnderTest.lastModified(_mockURI);
+ Assert.assertEquals(actual, now.toEpochMilli());
+
+ verify(_mockFileSystemClient).getDirectoryClient(any());
+ verify(_mockDirectoryClient).getProperties();
+ verify(_mockPathProperties).getLastModified();
+ }
+
@Test
public void testListFilesException() {
when(_mockFileSystemClient.listPaths(any(), any())).thenThrow(_mockDataLakeStorageException);
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java
index 1c3be6d4f5..61a12f6d42 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/AzurePinotFSTest.java
@@ -20,11 +20,22 @@ package org.apache.pinot.plugin.filesystem.test;
import com.microsoft.azure.datalake.store.ADLFileInputStream;
import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.DirectoryEntry;
+import com.microsoft.azure.datalake.store.DirectoryEntryType;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.filesystem.AzurePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -77,6 +88,95 @@ public class AzurePinotFSTest {
Assert.assertTrue(file.exists());
}
+ @Test
+ public void testListFilesWithMetadataNonRecursive()
+ throws IOException {
+ ADLStoreClient adlStoreClient = Mockito.mock(ADLStoreClient.class);
+ AzurePinotFS azurePinotFS = new AzurePinotFS(adlStoreClient);
+ URI baseURI = new File(_adlLocation, "testDirNonRecursive").toURI();
+
+ Date lastModifiedTime = new Date();
+ Date lastAccessTime = new Date();
+ DirectoryEntry testDir =
+ new DirectoryEntry("testDir", baseURI.toString(), 1024, null, null, lastAccessTime, lastModifiedTime,
+ DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+ Mockito.when(adlStoreClient.getDirectoryEntry(baseURI.getPath())).thenReturn(testDir);
+
+ List<DirectoryEntry> entries = new ArrayList<>();
+ DirectoryEntry dir =
+ new DirectoryEntry("dir1", testDir.fullName + "/dir1", 128, null, null, lastAccessTime, lastModifiedTime,
+ DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+ entries.add(dir);
+ DirectoryEntry file =
+ new DirectoryEntry("file1", testDir.fullName + "/file1", 1024, null, null, lastAccessTime, lastModifiedTime,
+ DirectoryEntryType.FILE, 128, 1, null, false, null);
+ entries.add(file);
+ Mockito.when(adlStoreClient.enumerateDirectory(Mockito.anyString())).thenReturn(entries);
+
+ String[] files = azurePinotFS.listFiles(baseURI, false);
+ Assert.assertEquals(files.length, 2);
+ Assert
+ .assertTrue(entries.stream().map(d -> d.fullName).collect(Collectors.toSet()).containsAll(Arrays.asList(files)),
+ Arrays.toString(files));
+
+ List<FileMetadata> fileMetadata = azurePinotFS.listFilesWithMetadata(baseURI, false);
+ Assert.assertEquals(fileMetadata.size(), 2);
+ Assert.assertTrue(entries.stream().map(d -> d.fullName).collect(Collectors.toSet())
+ .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), 1);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+ }
+
+ @Test
+ public void testListFilesWithMetadataRecursive()
+ throws IOException {
+ ADLStoreClient adlStoreClient = Mockito.mock(ADLStoreClient.class);
+ AzurePinotFS azurePinotFS = new AzurePinotFS(adlStoreClient);
+ URI baseURI = new File(_adlLocation, "testDirRecursive").toURI();
+
+ Date lastModifiedTime = new Date();
+ Date lastAccessTime = new Date();
+ DirectoryEntry testDir =
+ new DirectoryEntry("testDir", baseURI.toString(), 128, null, null, lastAccessTime, lastModifiedTime,
+ DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+ Mockito.when(adlStoreClient.getDirectoryEntry(baseURI.getPath())).thenReturn(testDir);
+
+ Set<String> expected = new HashSet<>();
+ List<DirectoryEntry> dirEntries = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ DirectoryEntry dir = new DirectoryEntry("dir" + i, testDir.fullName + "/dir" + i, 128, null, null, lastAccessTime,
+ lastModifiedTime, DirectoryEntryType.DIRECTORY, 128, 1, null, false, null);
+ dirEntries.add(dir);
+ expected.add(dir.fullName);
+ }
+ List<DirectoryEntry> fileEntries = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ DirectoryEntry file =
+ new DirectoryEntry("file" + i, testDir.fullName + "/dir2/file" + i, 1024, null, null, lastAccessTime,
+ lastModifiedTime, DirectoryEntryType.FILE, 128, 1, null, false, null);
+ fileEntries.add(file);
+ expected.add(file.fullName);
+ }
+
+ // Prepare the mock for calling listFiles() and listFilesWithMetadata().
+ Mockito.when(adlStoreClient.enumerateDirectory(Mockito.anyString()))
+ .thenReturn(dirEntries, Collections.emptyList(), fileEntries /* for listFiles() */, dirEntries,
+ Collections.emptyList(), fileEntries /* for listFilesWithMetadata() */);
+
+ String[] files = azurePinotFS.listFiles(baseURI, true);
+ Assert.assertEquals(files.length, 5);
+ Assert.assertTrue(expected.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+ List<FileMetadata> fileMetadata = azurePinotFS.listFilesWithMetadata(baseURI, true);
+ Assert.assertEquals(fileMetadata.size(), 5);
+ Assert.assertTrue(
+ expected.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), 2);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 3);
+ }
+
@AfterClass
public void tearDown() {
new File(_adlLocation).delete();
diff --git a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
index 5de0ceea33..f65ec1945f 100644
--- a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
@@ -20,13 +20,14 @@
package org.apache.pinot.plugin.filesystem;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
-import java.util.ArrayList;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -36,6 +37,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,34 +142,47 @@ public class HadoopPinotFS extends BasePinotFS {
@Override
public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
- ArrayList<String> filePathStrings = new ArrayList<>();
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ visitFiles(fileUri, recursive, f -> builder.add(f.getPath().toString()));
+ String[] listedFiles = builder.build().toArray(new String[0]);
+ LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
+ return listedFiles;
+ }
+
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
+ throws IOException {
+ ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
+ visitFiles(fileUri, recursive, f -> {
+ FileMetadata.Builder fileBuilder =
+ new FileMetadata.Builder().setFilePath(f.getPath().toString()).setLastModifiedTime(f.getModificationTime())
+ .setLength(f.getLen()).setIsDirectory(f.isDirectory());
+ listBuilder.add(fileBuilder.build());
+ });
+ ImmutableList<FileMetadata> listedFiles = listBuilder.build();
+ LOGGER.debug("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive);
+ return listedFiles;
+ }
+
+ private void visitFiles(URI fileUri, boolean recursive, Consumer<FileStatus> visitor)
+ throws IOException {
Path path = new Path(fileUri);
- if (_hadoopFS.exists(path)) {
- // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
- List<FileStatus> files = listStatus(path, recursive);
- for (FileStatus file : files) {
- filePathStrings.add(file.getPath().toString());
- }
- } else {
+ if (!_hadoopFS.exists(path)) {
throw new IllegalArgumentException("fileUri does not exist: " + fileUri);
}
- String[] retArray = new String[filePathStrings.size()];
- filePathStrings.toArray(retArray);
- return retArray;
+ visitFileStatus(path, recursive, visitor);
}
- private List<FileStatus> listStatus(Path path, boolean recursive)
+ private void visitFileStatus(Path path, boolean recursive, Consumer<FileStatus> visitor)
throws IOException {
- List<FileStatus> fileStatuses = new ArrayList<>();
+ // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
FileStatus[] files = _hadoopFS.listStatus(path);
for (FileStatus file : files) {
- fileStatuses.add(file);
+ visitor.accept(file);
if (file.isDirectory() && recursive) {
- List<FileStatus> subFiles = listStatus(file.getPath(), true);
- fileStatuses.addAll(subFiles);
+ visitFileStatus(file.getPath(), true, visitor);
}
}
- return fileStatuses;
}
@Override
diff --git a/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
index 7434aeeaf0..2ba85994b4 100644
--- a/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
@@ -21,8 +21,13 @@ package org.apache.pinot.plugin.filesystem;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -52,4 +57,61 @@ public class HadoopPinotFSTest {
hadoopFS.delete(baseURI, true);
}
}
+
+ @Test
+ public void testListFilesWithMetadata()
+ throws IOException {
+ URI baseURI = URI.create(TMP_DIR + "/HadoopPinotFSTestListFiles");
+ try (HadoopPinotFS hadoopFS = new HadoopPinotFS()) {
+ hadoopFS.init(new PinotConfiguration());
+
+ // Create a testDir and file underneath directory
+ int count = 5;
+ List<String> expectedNonRecursive = new ArrayList<>();
+ List<String> expectedRecursive = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ URI testDir = new Path(baseURI.getPath(), "testDir" + i).toUri();
+ hadoopFS.mkdir(testDir);
+ expectedNonRecursive.add((testDir.getScheme() == null ? "file:" : "") + testDir);
+
+ URI testFile = new Path(testDir.getPath(), "testFile" + i).toUri();
+ hadoopFS.touch(testFile);
+ expectedRecursive.add((testDir.getScheme() == null ? "file:" : "") + testDir);
+ expectedRecursive.add((testDir.getScheme() == null ? "file:" : "") + testFile);
+ }
+ URI testDirEmpty = new Path(baseURI.getPath(), "testDirEmpty").toUri();
+ hadoopFS.mkdir(testDirEmpty);
+ expectedNonRecursive.add((testDirEmpty.getScheme() == null ? "file:" : "") + testDirEmpty);
+ expectedRecursive.add((testDirEmpty.getScheme() == null ? "file:" : "") + testDirEmpty);
+
+ URI testRootFile = new Path(baseURI.getPath(), "testRootFile").toUri();
+ hadoopFS.touch(testRootFile);
+ expectedNonRecursive.add((testRootFile.getScheme() == null ? "file:" : "") + testRootFile);
+ expectedRecursive.add((testRootFile.getScheme() == null ? "file:" : "") + testRootFile);
+
+ // Assert that recursive list files and nonrecursive list files are as expected
+ String[] files = hadoopFS.listFiles(baseURI, false);
+ Assert.assertEquals(files.length, count + 2);
+ Assert.assertTrue(expectedNonRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+ files = hadoopFS.listFiles(baseURI, true);
+ Assert.assertEquals(files.length, count * 2 + 2);
+ Assert.assertTrue(expectedRecursive.containsAll(Arrays.asList(files)), Arrays.toString(files));
+
+ // Assert that recursive list files and nonrecursive list files with file info are as expected
+ List<FileMetadata> fileMetadata = hadoopFS.listFilesWithMetadata(baseURI, false);
+ Assert.assertEquals(fileMetadata.size(), count + 2);
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
+ Assert.assertTrue(expectedNonRecursive
+ .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ fileMetadata = hadoopFS.listFilesWithMetadata(baseURI, true);
+ Assert.assertEquals(fileMetadata.size(), count * 2 + 2);
+ Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
+ Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), count + 1);
+ Assert.assertTrue(expectedRecursive
+ .containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
+ fileMetadata.toString());
+ }
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
index 4def90b6de..a5e90b0de4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/FileMetadata.java
@@ -52,8 +52,8 @@ public class FileMetadata {
@Override
public String toString() {
- return "FileInfo{" + "_filePath='" + _filePath + '\'' + ", _lastModifiedTime=" + _lastModifiedTime + ", _length="
- + _length + ", _isDirectory=" + _isDirectory + '}';
+ return "FileMetadata{" + "_filePath='" + _filePath + '\'' + ", _lastModifiedTime=" + _lastModifiedTime
+ + ", _length=" + _length + ", _isDirectory=" + _isDirectory + '}';
}
public static class Builder {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
index ed6c8f2143..5eae4d9267 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
@@ -122,14 +122,14 @@ public class LocalPinotFS extends BasePinotFS {
throws IOException {
File file = toFile(fileUri);
if (!recursive) {
- return Arrays.stream(file.list()).map(s -> getFileInfo(new File(file, s))).collect(Collectors.toList());
+ return Arrays.stream(file.list()).map(s -> getFileMetadata(new File(file, s))).collect(Collectors.toList());
} else {
- return Files.walk(Paths.get(fileUri)).filter(s -> !s.equals(file.toPath())).map(p -> getFileInfo(p.toFile()))
+ return Files.walk(Paths.get(fileUri)).filter(s -> !s.equals(file.toPath())).map(p -> getFileMetadata(p.toFile()))
.collect(Collectors.toList());
}
}
- private static FileMetadata getFileInfo(File file) {
+ private static FileMetadata getFileMetadata(File file) {
return new FileMetadata.Builder().setFilePath(file.getAbsolutePath()).setLastModifiedTime(file.lastModified())
.setLength(file.length()).setIsDirectory(file.isDirectory()).build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org