You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/02/25 19:12:31 UTC
[incubator-pinot] branch master updated: Fix listFiles method in
HadoopPinotFS (#3838)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e3a9184 Fix listFiles method in HadoopPinotFS (#3838)
e3a9184 is described below
commit e3a9184ff5f3cff0cc68630dbeca3968bf07ae4b
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Mon Feb 25 11:12:27 2019 -0800
Fix listFiles method in HadoopPinotFS (#3838)
* Fix listFiles method in HadoopPinotFS
---
.../resources/LLCSegmentCompletionHandlers.java | 3 +++
.../realtime/PinotLLCRealtimeSegmentManager.java | 6 +++++-
.../org/apache/pinot/filesystem/HadoopPinotFS.java | 22 ++++++++++++++++++----
3 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 8b5b36b..0d5de21 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -304,6 +304,9 @@ public class LLCSegmentCompletionHandlers {
ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName));
java.net.URI segmentFileURI;
if (isSplitCommit) {
+ // We only clean up tmp segment file under table dir, so don't create any sub-dir under table dir.
+ // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
+ // TODO: move tmp file logic into SegmentCompletionUtils.
String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
segmentFileURI =
ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5d9d290..6b32400 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -391,8 +391,12 @@ public class PinotLLCRealtimeSegmentManager {
return false;
}
+ // Cleans up tmp segment files under table dir.
+ // We only clean up tmp segment files in table level dir, so there's no need to list recursively.
+ // See LLCSegmentCompletionHandlers.uploadSegment().
+ // TODO: move tmp file logic into SegmentCompletionUtils.
try {
- for (String uri : pinotFS.listFiles(tableDirURI, true)) {
+ for (String uri : pinotFS.listFiles(tableDirURI, false)) {
if (uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
LOGGER.warn("Deleting " + uri);
pinotFS.delete(new URI(uri), true);
diff --git a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index d680431..1620b5a 100644
--- a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++ b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -138,10 +139,10 @@ public class HadoopPinotFS extends PinotFS {
ArrayList<String> filePathStrings = new ArrayList<>();
Path path = new Path(fileUri);
if (_hadoopFS.exists(path)) {
- RemoteIterator<LocatedFileStatus> fileListItr = _hadoopFS.listFiles(path, recursive);
- while (fileListItr != null && fileListItr.hasNext()) {
- LocatedFileStatus file = fileListItr.next();
- filePathStrings.add(file.getPath().toUri().toString());
+ // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
+ List<FileStatus> files = listStatus(path, recursive);
+ for (FileStatus file : files) {
+ filePathStrings.add(file.getPath().toUri().getRawPath());
}
} else {
throw new IllegalArgumentException("segmentUri is not valid");
@@ -151,6 +152,19 @@ public class HadoopPinotFS extends PinotFS {
return retArray;
}
+ private List<FileStatus> listStatus(Path path, boolean recursive) throws IOException {
+ List<FileStatus> fileStatuses = new ArrayList<>();
+ FileStatus[] files = _hadoopFS.listStatus(path);
+ for (FileStatus file : files) {
+ fileStatuses.add(file);
+ if (file.isDirectory() && recursive) {
+ List<FileStatus> subFiles =listStatus(file.getPath(), true);
+ fileStatuses.addAll(subFiles);
+ }
+ }
+ return fileStatuses;
+ }
+
@Override
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org