You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/02/25 19:12:31 UTC

[incubator-pinot] branch master updated: Fix listFiles method in HadoopPinotFS (#3838)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e3a9184  Fix listFiles method in HadoopPinotFS (#3838)
e3a9184 is described below

commit e3a9184ff5f3cff0cc68630dbeca3968bf07ae4b
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Mon Feb 25 11:12:27 2019 -0800

    Fix listFiles method in HadoopPinotFS (#3838)
    
    * Fix listFiles method in HadoopPinotFS
---
 .../resources/LLCSegmentCompletionHandlers.java    |  3 +++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  6 +++++-
 .../org/apache/pinot/filesystem/HadoopPinotFS.java | 22 ++++++++++++++++++----
 3 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 8b5b36b..0d5de21 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -304,6 +304,9 @@ public class LLCSegmentCompletionHandlers {
           ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName));
       java.net.URI segmentFileURI;
       if (isSplitCommit) {
+        // We only clean up tmp segment file under table dir, so don't create any sub-dir under table dir.
+        // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
+        // TODO: move tmp file logic into SegmentCompletionUtils.
         String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
         segmentFileURI =
             ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5d9d290..6b32400 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -391,8 +391,12 @@ public class PinotLLCRealtimeSegmentManager {
       return false;
     }
 
+    // Cleans up tmp segment files under table dir.
+    // We only clean up tmp segment files in table level dir, so there's no need to list recursively.
+    // See LLCSegmentCompletionHandlers.uploadSegment().
+    // TODO: move tmp file logic into SegmentCompletionUtils.
     try {
-      for (String uri : pinotFS.listFiles(tableDirURI, true)) {
+      for (String uri : pinotFS.listFiles(tableDirURI, false)) {
         if (uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
           LOGGER.warn("Deleting " + uri);
           pinotFS.delete(new URI(uri), true);
diff --git a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index d680431..1620b5a 100644
--- a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++ b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -138,10 +139,10 @@ public class HadoopPinotFS extends PinotFS {
     ArrayList<String> filePathStrings = new ArrayList<>();
     Path path = new Path(fileUri);
     if (_hadoopFS.exists(path)) {
-      RemoteIterator<LocatedFileStatus> fileListItr = _hadoopFS.listFiles(path, recursive);
-      while (fileListItr != null && fileListItr.hasNext()) {
-        LocatedFileStatus file = fileListItr.next();
-        filePathStrings.add(file.getPath().toUri().toString());
+      // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
+      List<FileStatus> files = listStatus(path, recursive);
+      for (FileStatus file : files) {
+        filePathStrings.add(file.getPath().toUri().getRawPath());
       }
     } else {
       throw new IllegalArgumentException("segmentUri is not valid");
@@ -151,6 +152,19 @@ public class HadoopPinotFS extends PinotFS {
     return retArray;
   }
 
+  private List<FileStatus> listStatus(Path path, boolean recursive) throws IOException {
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    FileStatus[] files = _hadoopFS.listStatus(path);
+    for (FileStatus file : files) {
+      fileStatuses.add(file);
+      if (file.isDirectory() && recursive) {
+        List<FileStatus> subFiles =listStatus(file.getPath(), true);
+        fileStatuses.addAll(subFiles);
+      }
+    }
+    return fileStatuses;
+  }
+
   @Override
   public void copyToLocalFile(URI srcUri, File dstFile)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org