You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2023/08/01 13:47:45 UTC

[impala] branch master updated: IMPALA-12298: Improve incremental load of Iceberg tables

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new c84221369 IMPALA-12298: Improve incremental load of Iceberg tables
c84221369 is described below

commit c8422136962b8d08e5f44d8351fb4fe7cdb675b8
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Jul 26 19:28:45 2023 +0200

    IMPALA-12298: Improve incremental load of Iceberg tables
    
    Currently Impala reloads the whole table with all its metadata
    when a table is updated. Even if there are no files modififed, or
    only a few file added. This hurts performance for large tables,
    especially when Hadoop RPC encryption is enabled. See HADOOP-14558 and
    HADOOP-10768 for details.
    
    This patch adds an optimization to only load the newly added files
    if their number are under a threshold. The threshold can be set by
    the backend flag 'iceberg_reload_new_files_threshold' (100 by default).
    If there are more files than the threshold, we fallback to the old
    behavior.
    
    Testing:
     * added Unit test
     * manually checked the TRACE logs of IcebergFileMetadataLoader
    
    Change-Id: Icf643798a93e74ae7b0f37ceeab0a8052fb2699d
    Reviewed-on: http://gerrit.cloudera.org:8080/20271
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   5 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/catalog/FeIcebergTable.java  |   8 +-
 .../apache/impala/catalog/FileMetadataLoader.java  |   2 +-
 .../impala/catalog/IcebergFileMetadataLoader.java  | 147 ++++++++++++-
 .../org/apache/impala/catalog/IcebergTable.java    |   4 +-
 .../catalog/iceberg/GroupedContentFiles.java       |   5 +
 .../org/apache/impala/common/FileSystemUtil.java   |  17 +-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../impala/catalog/FileMetadataLoaderTest.java     | 227 +++++++++++++++++++--
 11 files changed, 393 insertions(+), 30 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 09e981c96..d58b56c42 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -408,6 +408,11 @@ DEFINE_int64(update_catalogd_rpc_resend_interval_ms, 100, "(Advanced) Interval (
     "with which the statestore resends the update catalogd RPC to a subscriber if the "
     "statestore has failed to send the RPC to the subscriber.");
 
+DEFINE_int32(iceberg_reload_new_files_threshold, 100, "(Advanced) If during a table "
+    "refresh the number of new files are greater than this, catalogd will completely "
+    "reload all file metadata. If number of new files are less or equal to this, "
+    "catalogd will only load the metadata of the newly added files.");
+
 // TGeospatialLibrary's values are mapped here as constants
 static const string geo_lib_none = "NONE";
 static const string geo_lib_hive_esri = "HIVE_ESRI";
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 965547515..c10f472e4 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -107,6 +107,7 @@ DECLARE_string(geospatial_library);
 DECLARE_int32(thrift_rpc_max_message_size);
 DECLARE_string(file_metadata_reload_properties);
 DECLARE_string(java_weigher);
+DECLARE_int32(iceberg_reload_new_files_threshold);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -421,6 +422,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_thrift_rpc_max_message_size(FLAGS_thrift_rpc_max_message_size);
   cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor);
   cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm");
+  cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 5e4868082..6cee3ab7a 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -258,4 +258,6 @@ struct TBackendGflags {
   113: required double scan_range_cost_factor
 
   114: required bool use_jamm_weigher
+
+  115: required i32 iceberg_reload_new_files_threshold
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 1d579cac5..c55c9f7fa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -698,18 +698,16 @@ public interface FeIcebergTable extends FeFsTable {
         FileStatus fileStatus, FeIcebergTable table) throws IOException {
       Reference<Long> numUnknownDiskIds = new Reference<>(0L);
 
-      String relPath = null;
       String absPath = null;
       Path tableLoc = new Path(table.getIcebergTableLocation());
-      URI relUri = tableLoc.toUri().relativize(fileStatus.getPath().toUri());
-      if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
+      String relPath = FileSystemUtil.relativizePathNoThrow(
+          fileStatus.getPath(), tableLoc);
+      if (relPath == null) {
         if (Utils.requiresDataFilesInTableLocation(table)) {
           throw new RuntimeException(fileStatus.getPath()
               + " is outside of the Iceberg table location " + tableLoc);
         }
         absPath = fileStatus.getPath().toString();
-      } else {
-        relPath = relUri.getPath();
       }
 
       if (!FileSystemUtil.supportsStorageIds(fs)) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 9a514b05d..16720e01e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -67,7 +67,7 @@ public class FileMetadataLoader {
 
   protected boolean forceRefreshLocations = false;
 
-  private List<FileDescriptor> loadedFds_;
+  protected List<FileDescriptor> loadedFds_;
   private List<FileDescriptor> loadedInsertDeltaFds_;
   private List<FileDescriptor> loadedDeleteDeltaFds_;
   protected LoadStats loadStats_;
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index c365e9cac..e0540baa7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -17,13 +17,17 @@
 
 package org.apache.impala.catalog;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,13 +40,32 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Reference;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.ListMap;
 
+import org.apache.impala.util.ThreadNameAnnotator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Utility for loading the content files metadata of the Iceberg tables.
  */
 public class IcebergFileMetadataLoader extends FileMetadataLoader {
+  private final static Logger LOG = LoggerFactory.getLogger(
+      IcebergFileMetadataLoader.class);
+
+  private static final Configuration CONF = new Configuration();
+
+  // Default value of 'newFilesThreshold_' if the given parameter or startup flag have
+  // invalid value.
+  private final int NEW_FILES_THRESHOLD_DEFAULT = 100;
+
+  // If there are more new files than 'newFilesThreshold_', we should fall back
+  // to regular file metadata loading.
+  private final int newFilesThreshold_;
+
   private final GroupedContentFiles icebergFiles_;
   private final boolean canDataBeOutsideOfTableLocation_;
 
@@ -50,10 +73,82 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
       List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
       ValidTxnList validTxnList, ValidWriteIdList writeIds,
       GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation) {
+    this(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds,
+        icebergFiles, canDataBeOutsideOfTableLocation,
+        BackendConfig.INSTANCE.icebergReloadNewFilesThreshold());
+  }
+
+  public IcebergFileMetadataLoader(Path partDir, boolean recursive,
+      List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+      ValidTxnList validTxnList, ValidWriteIdList writeIds,
+      GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation,
+      int newFilesThresholdParam) {
     super(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds,
         HdfsFileFormat.ICEBERG);
     icebergFiles_ = icebergFiles;
     canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation;
+    if (newFilesThresholdParam >= 0) {
+      newFilesThreshold_ = newFilesThresholdParam;
+    } else {
+      newFilesThreshold_ = NEW_FILES_THRESHOLD_DEFAULT;
+      LOG.warn("Ignoring invalid new files threshold: {} " +
+          "using value: {}", newFilesThresholdParam, newFilesThreshold_);
+    }
+  }
+
+  @Override
+  public void load() throws CatalogException, IOException {
+    if (!shouldReuseOldFds()) {
+      super.load();
+    } else {
+      reloadWithOldFds();
+    }
+  }
+
+  /**
+   *  Iceberg tables are a collection of immutable, uniquely identifiable data files,
+   *  which means we can safely reuse the old FDs.
+   */
+  private void reloadWithOldFds() throws IOException {
+    loadStats_ = new LoadStats(partDir_);
+    FileSystem fs = partDir_.getFileSystem(CONF);
+
+    String msg = String.format("Refreshing Iceberg file metadata from path %s " +
+        "while reusing old file descriptors", partDir_);
+    LOG.trace(msg);
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
+      loadedFds_ = new ArrayList<>();
+      Reference<Long> numUnknownDiskIds = new Reference<>(0L);
+      for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
+        FileDescriptor fd = getOldFd(contentFile);
+        if (fd == null) {
+          fd = getFileDescriptor(fs, contentFile, numUnknownDiskIds);
+        } else {
+          ++loadStats_.skippedFiles;
+        }
+        loadedFds_.add(Preconditions.checkNotNull(fd));
+      }
+      Preconditions.checkState(loadStats_.loadedFiles <= newFilesThreshold_);
+      loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(loadStats_.debugString());
+      }
+    }
+  }
+
+  private FileDescriptor getFileDescriptor(FileSystem fs, ContentFile<?> contentFile,
+        Reference<Long> numUnknownDiskIds) throws IOException {
+    Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
+        new Path(contentFile.path().toString()));
+    FileStatus stat;
+    if (FileSystemUtil.supportsStorageIds(fs)) {
+      stat = Utils.createLocatedFileStatus(fileLoc, fs);
+    } else {
+      // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves.
+      stat = Utils.createFileStatus(contentFile, fileLoc);
+    }
+    return getFileDescriptor(fs, FileSystemUtil.supportsStorageIds(fs),
+        numUnknownDiskIds, stat);
   }
 
   /**
@@ -63,18 +158,15 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
   @Override
   protected FileDescriptor getFileDescriptor(FileSystem fs, boolean listWithLocations,
       Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws IOException {
-    String relPath = null;
     String absPath = null;
-    URI relUri = partDir_.toUri().relativize(fileStatus.getPath().toUri());
-    if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
+    String relPath = FileSystemUtil.relativizePathNoThrow(fileStatus.getPath(), partDir_);
+    if (relPath == null) {
       if (canDataBeOutsideOfTableLocation_) {
         absPath = fileStatus.getPath().toString();
       } else {
         throw new IOException(String.format("Failed to load Iceberg datafile %s, because "
             + "it's outside of the table location", fileStatus.getPath().toUri()));
       }
-    } else {
-      relPath = relUri.getPath();
     }
 
     String path = Strings.isNullOrEmpty(relPath) ? absPath : relPath;
@@ -136,4 +228,47 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
     }
     return stats;
   }
+
+  @VisibleForTesting
+  boolean shouldReuseOldFds() throws IOException {
+    if (oldFdsByPath_ == null || oldFdsByPath_.isEmpty()) return false;
+    if (forceRefreshLocations) return false;
+
+    int oldFdsSize = oldFdsByPath_.size();
+    int iceContentFilesSize = icebergFiles_.size();
+
+    if (iceContentFilesSize - oldFdsSize > newFilesThreshold_) {
+      LOG.trace("There are at least {} new files under path {}.",
+          iceContentFilesSize - oldFdsSize, partDir_);
+      return false;
+    }
+
+    int newFiles = 0;
+    for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
+      if (getOldFd(contentFile) == null) {
+        ++newFiles;
+        if (newFiles > newFilesThreshold_) {
+          LOG.trace("There are at least {} new files under path {}.", newFiles, partDir_);
+          return false;
+        }
+      }
+    }
+    LOG.trace("There are only {} new files under path {}.", newFiles, partDir_);
+    return true;
+  }
+
+  FileDescriptor getOldFd(ContentFile<?> contentFile) throws IOException {
+    Path contentFilePath = FileSystemUtil.createFullyQualifiedPath(
+        new Path(contentFile.path().toString()));
+    String lookupPath = FileSystemUtil.relativizePathNoThrow(contentFilePath, partDir_);
+    if (lookupPath == null) {
+      if (canDataBeOutsideOfTableLocation_) {
+        lookupPath = contentFilePath.toString();
+      } else {
+        throw new IOException(String.format("Failed to load Iceberg datafile %s, because "
+            + "it's outside of the table location", contentFilePath));
+      }
+    }
+    return oldFdsByPath_.get(lookupPath);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 91eefeee9..4c83acf02 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -334,7 +334,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
    * propagate alterations made to the Iceberg table to HMS.
    */
   @Override
-  public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
+  public void load(boolean reuseMetadata, IMetaStoreClient msClient,
       org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
       throws TableLoadingException {
     final Timer.Context context =
@@ -362,7 +362,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
         hdfsTable_.setIcebergFiles(icebergFiles);
         hdfsTable_.setCanDataBeOutsideOfTableLocation(
             !Utils.requiresDataFilesInTableLocation(this));
-        hdfsTable_.load(msClient, msTable_, reason);
+        hdfsTable_.load(reuseMetadata, msClient, msTable_, reason);
         fileStore_ = Utils.loadAllPartition(this, icebergFiles);
         partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
         setIcebergTableStats();
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
index da3aac4da..a01f43c10 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
@@ -57,6 +57,11 @@ public class GroupedContentFiles {
     return Iterables.concat(dataFilesWithoutDeletes, dataFilesWithDeletes, deleteFiles);
   }
 
+  public int size() {
+    return dataFilesWithDeletes.size() + dataFilesWithoutDeletes.size() +
+        deleteFiles.size();
+  }
+
   public boolean isEmpty() {
     return Iterables.isEmpty(getAllContentFiles());
   }
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index bd88f4a98..519449237 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -965,13 +965,28 @@ public class FileSystemUtil {
    */
   public static String relativizePath(Path path, Path startPath) {
     URI relUri = startPath.toUri().relativize(path.toUri());
-    if (relUri.isAbsolute() || relUri.getPath().startsWith("/")) {
+    if (!isRelative(relUri)) {
       throw new RuntimeException("FileSystem returned an unexpected path " +
           path + " for a file within " + startPath);
     }
     return relUri.getPath();
   }
 
+  /**
+   * Return the path of 'path' relative to the startPath. This may
+   * differ from simply the file name in the case of recursive listings.
+   * Instead of throwing an exception, it returns null when cannot relativize 'path'.
+   */
+  public static String relativizePathNoThrow(Path path, Path startPath) {
+    URI relUri = startPath.toUri().relativize(path.toUri());
+    if (isRelative(relUri)) return relUri.getPath();
+    return null;
+  }
+
+  private static boolean isRelative(URI uri) {
+    return !(uri.isAbsolute() || uri.getPath().startsWith(Path.SEPARATOR));
+  }
+
   /**
    * Util method to check if the given file status relative to its parent is contained
    * in a ignored directory. This is useful to ignore the files which seemingly are valid
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index ac7f62715..29352d908 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -415,4 +415,8 @@ public class BackendConfig {
   public boolean useJammWeigher() {
     return backendCfg_.use_jamm_weigher;
   }
+
+  public int icebergReloadNewFilesThreshold() {
+    return backendCfg_.iceberg_reload_new_files_threshold;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 4f278850c..d84e04ff2 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,8 +32,11 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
 import org.junit.Test;
 
@@ -113,15 +117,15 @@ public class FileMetadataLoaderTest {
 
   @Test
   public void testIcebergLoading() throws IOException, CatalogException {
-    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
-    Path table1Path = new Path(
-        "hdfs://localhost:20500/test-warehouse/iceberg_test/iceberg_partitioned");
-    FileMetadataLoader fml1 = new FileMetadataLoader(table1Path, /* recursive=*/true,
-        /* oldFds = */ Collections.emptyList(), hostIndex, null, null,
-        HdfsFileFormat.ICEBERG);
+    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+    IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
     fml1.load();
-    assertEquals(25, fml1.getStats().loadedFiles);
-    assertEquals(25, fml1.getLoadedFds().size());
+    assertEquals(20, fml1.getStats().loadedFiles);
+    assertEquals(0, fml1.getStats().skippedFiles);
+    assertEquals(20, fml1.getLoadedFds().size());
 
     ArrayList<String> relPaths = new ArrayList<>(
         Collections2.transform(fml1.getLoadedFds(), FileDescriptor::getRelativePath));
@@ -129,14 +133,14 @@ public class FileMetadataLoaderTest {
     assertEquals("data/event_time_hour=2020-01-01-08/action=view/" +
         "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet", relPaths.get(0));
 
-    Path table2Path = new Path(
-        "hdfs://localhost:20500/test-warehouse/iceberg_test/iceberg_non_partitioned");
-    FileMetadataLoader fml2 = new FileMetadataLoader(table2Path, /* recursive=*/true,
-        /* oldFds = */ Collections.emptyList(), hostIndex, null, null,
-        HdfsFileFormat.ICEBERG);
+    IcebergFileMetadataLoader fml2 =  getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
     fml2.load();
-    assertEquals(25, fml2.getStats().loadedFiles);
-    assertEquals(25, fml2.getLoadedFds().size());
+    assertEquals(20, fml2.getStats().loadedFiles);
+    assertEquals(0, fml2.getStats().skippedFiles);
+    assertEquals(20, fml2.getLoadedFds().size());
 
     relPaths = new ArrayList<>(
         Collections2.transform(fml2.getLoadedFds(), FileDescriptor::getRelativePath));
@@ -145,6 +149,199 @@ public class FileMetadataLoaderTest {
         relPaths.get(0));
   }
 
+  @Test
+  public void testIcebergRefresh() throws IOException, CatalogException {
+    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+    IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    fml1.load();
+
+    IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ fml1.getLoadedFds(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    assertTrue(fml1Refresh.shouldReuseOldFds());
+    fml1Refresh.load();
+    assertEquals(0, fml1Refresh.getStats().loadedFiles);
+    assertEquals(20, fml1Refresh.getStats().skippedFiles);
+    assertEquals(20, fml1Refresh.getLoadedFds().size());
+
+    List<String> relPaths = new ArrayList<>(
+        Collections2.transform(fml1Refresh.getLoadedFds(),
+        FileDescriptor::getRelativePath));
+    Collections.sort(relPaths);
+    assertEquals("data/event_time_hour=2020-01-01-08/action=view/" +
+        "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet", relPaths.get(0));
+
+    IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    fml2.load();
+
+    IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ fml2.getLoadedFds(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    assertTrue(fml2Refresh.shouldReuseOldFds());
+    fml2Refresh.load();
+    assertEquals(0, fml2Refresh.getStats().loadedFiles);
+    assertEquals(20, fml2Refresh.getStats().skippedFiles);
+    assertEquals(20, fml2Refresh.getLoadedFds().size());
+
+    relPaths = new ArrayList<>(
+        Collections2.transform(fml2Refresh.getLoadedFds(),
+            FileDescriptor::getRelativePath));
+    Collections.sort(relPaths);
+    assertEquals("data/00001-1-5dbd44ad-18bc-40f2-9dd6-aeb2cc23457c-00000.parquet",
+        relPaths.get(0));
+  }
+
+  @Test
+  public void testIcebergPartialRefresh() throws IOException, CatalogException {
+    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+    IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    fml1.load();
+
+    IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    assertTrue(fml1Refresh.shouldReuseOldFds());
+    fml1Refresh.load();
+    assertEquals(10, fml1Refresh.getStats().loadedFiles);
+    assertEquals(10, fml1Refresh.getStats().skippedFiles);
+    assertEquals(20, fml1Refresh.getLoadedFds().size());
+
+    IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    fml2.load();
+
+    IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    assertTrue(fml2Refresh.shouldReuseOldFds());
+    fml2Refresh.load();
+    assertEquals(10, fml2Refresh.getStats().loadedFiles);
+    assertEquals(10, fml2Refresh.getStats().skippedFiles);
+    assertEquals(20, fml2Refresh.getLoadedFds().size());
+  }
+
+  @Test
+  public void testIcebergNewFilesThreshold() throws IOException, CatalogException {
+    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+    IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    fml1.load();
+
+    IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    assertTrue(fml1Refresh.shouldReuseOldFds());
+    fml1Refresh.setForceRefreshBlockLocations(true);
+    assertFalse(fml1Refresh.shouldReuseOldFds());
+    fml1Refresh.setForceRefreshBlockLocations(false);
+    assertTrue(fml1Refresh.shouldReuseOldFds());
+
+    IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false, 10);
+    assertTrue(fml1Refresh10.shouldReuseOldFds());
+    IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_partitioned",
+        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false, 9);
+    assertFalse(fml1Refresh9.shouldReuseOldFds());
+
+
+    IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    fml2.load();
+
+    IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false);
+    IcebergFileMetadataLoader fml2Refresh10 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false, 10);
+    assertTrue(fml2Refresh10.shouldReuseOldFds());
+    IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_non_partitioned",
+        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* canDataBeOutsideOfTableLocation = */ false, 9);
+    assertFalse(fml2Refresh9.shouldReuseOldFds());
+  }
+
+  @Test
+  public void testIcebergMultipleStorageLocations() throws IOException, CatalogException {
+    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+    IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_multiple_storage_locations",
+        /* oldFds = */ Collections.emptyList(),
+        /* canDataBeOutsideOfTableLocation = */ true);
+    fml1.load();
+
+    IcebergFileMetadataLoader fml1Refresh1 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_multiple_storage_locations",
+        /* oldFds = */ fml1.getLoadedFds().subList(0, 1),
+        /* canDataBeOutsideOfTableLocation = */ true);
+    assertTrue(fml1Refresh1.shouldReuseOldFds());
+    fml1Refresh1.load();
+    assertEquals(5, fml1Refresh1.getStats().loadedFiles);
+    assertEquals(1, fml1Refresh1.getStats().skippedFiles);
+    assertEquals(6, fml1Refresh1.getLoadedFds().size());
+
+    IcebergFileMetadataLoader fml1Refresh5 = getLoaderForIcebergTable(catalog,
+        "functional_parquet", "iceberg_multiple_storage_locations",
+        /* oldFds = */ fml1.getLoadedFds().subList(0, 5),
+        /* canDataBeOutsideOfTableLocation = */ true);
+    assertTrue(fml1Refresh5.shouldReuseOldFds());
+    fml1Refresh5.load();
+    assertEquals(1, fml1Refresh5.getStats().loadedFiles);
+    assertEquals(5, fml1Refresh5.getStats().skippedFiles);
+    assertEquals(6, fml1Refresh5.getLoadedFds().size());
+  }
+
+  private IcebergFileMetadataLoader getLoaderForIcebergTable(
+      CatalogServiceCatalog catalog, String dbName, String tblName,
+      List<FileDescriptor> oldFds, boolean canDataBeOutsideOfTableLocation)
+      throws CatalogException {
+    return getLoaderForIcebergTable(catalog, dbName, tblName, oldFds,
+        canDataBeOutsideOfTableLocation, -1);
+  }
+
+  private IcebergFileMetadataLoader getLoaderForIcebergTable(
+      CatalogServiceCatalog catalog, String dbName, String tblName,
+      List<FileDescriptor> oldFds, boolean canDataBeOutsideOfTableLocation,
+      int newFilesThreshold)
+      throws CatalogException {
+    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+    FeIcebergTable iceT = (FeIcebergTable)catalog.getOrLoadTable(
+        dbName, tblName, "test", null);
+    Path location = new Path(iceT.getLocation());
+    GroupedContentFiles iceFiles = IcebergUtil.getIcebergFiles(iceT,
+        /*predicates=*/Collections.emptyList(), /*timeTravelSpec=*/null);
+    return new IcebergFileMetadataLoader(location, /* recursive=*/true,
+        oldFds, hostIndex, null, null, iceFiles, canDataBeOutsideOfTableLocation,
+        newFilesThreshold);
+  }
+
   private FileMetadataLoader getLoaderForAcidTable(
       String validWriteIdString, String path, HdfsFileFormat format)
       throws IOException, CatalogException {