You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/02/02 09:12:46 UTC

[impala] 03/04: IMPALA-11076: Reuse FDs loaded by HdfsTable during IcebergTable load

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5fd859ee0caefc9c9321cbdd669d9ed8ab89d43b
Author: Tamas Mate <tm...@cloudera.com>
AuthorDate: Fri Jan 28 16:05:55 2022 +0100

    IMPALA-11076: Reuse FDs loaded by HdfsTable during IcebergTable load
    
    Impala used the FileSystem.getFileStatus() on every Iceberg DataFile to
    create the FileDescriptors. This operation is redundant because there is
    an internal HdfsTable inside the IcebergTable which loads the
    FileDescriptors recursively as well earlier.
    
    This commit updates the loadAllPartition operation to reuse the
    HdfsTable's FileDescriptors.
    
    Testing:
     - Executed the Iceberg tests.
    
    Change-Id: Id49daec60237dd6c8cce92c00a09fd9a6e6479b4
    Reviewed-on: http://gerrit.cloudera.org:8080/18160
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/catalog/FeIcebergTable.java  | 39 ++++++++++++++++------
 1 file changed, 29 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 1cb632b..e52e91f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -501,20 +501,39 @@ public interface FeIcebergTable extends FeFsTable {
     }
 
     /**
-     * Get all FileDescriptor from iceberg table without any predicates.
+     * Returns the FileDescriptors loaded by the internal HdfsTable. To avoid returning
+     * the metadata files the resulset is limited to the files that are tracked by
+     * Iceberg. Both the HdfsBaseDir and the DataFile path can contain the scheme in their
+     * path, using org.apache.hadoop.fs.Path to normalize the paths.
      */
     public static Map<String, HdfsPartition.FileDescriptor> loadAllPartition(
-        FeIcebergTable table) throws IOException, TableLoadingException {
-      // Empty predicates
+        IcebergTable table) throws IOException, TableLoadingException {
+      Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap = new HashMap<>();
+      Collection<HdfsPartition> partitions =
+          ((HdfsTable)table.getFeFsTable()).partitionMap_.values();
+      for (HdfsPartition partition : partitions) {
+        for (FileDescriptor fileDesc : partition.getFileDescriptors()) {
+            Path path = new Path(table.getHdfsBaseDir() + Path.SEPARATOR +
+                fileDesc.getRelativePath());
+            hdfsFileDescMap.put(path.toUri().getPath(), fileDesc);
+        }
+      }
+      Map<String, HdfsPartition.FileDescriptor> fileDescMap = new HashMap<>();
       List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(table,
           new ArrayList<>(), /*timeTravelSpecl=*/null);
-
-      Map<String, HdfsPartition.FileDescriptor> fileDescMap = new HashMap<>();
-      for (DataFile file : dataFileList) {
-        HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(
-            new Path(file.path().toString()),
-            new Path(table.getIcebergTableLocation()), table.getHostIndex());
-        fileDescMap.put(IcebergUtil.getDataFilePathHash(file), fileDesc);
+      for (DataFile dataFile : dataFileList) {
+          Path path = new Path(dataFile.path().toString());
+          if (hdfsFileDescMap.containsKey(path.toUri().getPath())) {
+            String pathHash = IcebergUtil.getDataFilePathHash(dataFile);
+            fileDescMap.put(pathHash, hdfsFileDescMap.get(path.toUri().getPath()));
+          } else {
+            LOG.warn("Iceberg DataFile '{}' cannot be found in the HDFS recursive file "
+                + "listing results.", path.toString());
+            HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(
+                new Path(dataFile.path().toString()),
+                new Path(table.getIcebergTableLocation()), table.getHostIndex());
+            fileDescMap.put(IcebergUtil.getDataFilePathHash(dataFile), fileDesc);
+          }
       }
       return fileDescMap;
     }