You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/04/05 07:43:15 UTC

[impala] 03/03: IMPALA-10737: Optimize the number of Iceberg API Metadata requests

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit efba58f5f05da5dc1f1f5bb3c6fd812bf7f679b9
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Fri Mar 25 10:58:43 2022 +0100

    IMPALA-10737: Optimize the number of Iceberg API Metadata requests
    
    Iceberg stores the table metadata next to the data files, when this is
    accessed through the Iceberg API a filesystem call is executed (HDFS,
    S3, ADLS). These calls were used in various places during query
    processing and this patch unifies the Iceberg metadata request in the
    CatalogD and ImpalaD:
     - CatalogD loads and caches the org.apache.iceberg.Table object.
     - When ImpalaDs request the Table metadata, the current catalog
       snapshot id is sent over and the ImpalaD loads and caches the
       org.apache.iceberg.Table object throught Iceberg API as well.
    
    This approach (loading the Iceberg table twice) was choosen because
    the org.apache.iceberg.Table could not be meaningfully serialized and
    deserialized. The result of a serialized Table is a lightweight
    SerializableTable object which is in the Iceberg core package.
    
    As a result REFRESH/INVALIDATE METADATA is required to reload any
    Iceberg metadata changes and the metadata load time is improved.
    This improvement is more significant for smaller queries, where the
    metadata request has larger impact on the query execution time.
    
    Additionally, the dependency on the Iceberg core package has been
    reduced and the TableMetadata/BaseTable class uses has been replaced
    with the Table class from the Iceberg api package in most places.
    
    Testing:
     - Passed Iceberg E2E tests.
    
    Change-Id: I5492e0cdb31602f0276029c2645d14ff5cb2f672
    Reviewed-on: http://gerrit.cloudera.org:8080/18353
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                |  6 +-
 common/thrift/CatalogService.thrift                | 14 +--
 .../org/apache/impala/catalog/FeIcebergTable.java  | 58 +++++--------
 .../org/apache/impala/catalog/IcebergTable.java    | 66 +++++++--------
 .../catalog/IcebergTableLoadingException.java      | 32 +++++++
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |  5 ++
 .../impala/catalog/local/CatalogdMetaProvider.java | 94 +++++++++++++-------
 .../impala/catalog/local/DirectMetaProvider.java   | 23 +++--
 .../impala/catalog/local/LocalIcebergTable.java    | 99 ++++++++++++----------
 .../apache/impala/catalog/local/LocalTable.java    |  2 +-
 .../apache/impala/catalog/local/MetaProvider.java  | 17 ++--
 .../org/apache/impala/planner/IcebergScanNode.java | 15 +---
 .../apache/impala/service/CatalogOpExecutor.java   | 10 ++-
 .../java/org/apache/impala/service/Frontend.java   | 13 ++-
 .../impala/service/IcebergCatalogOpExecutor.java   | 15 +---
 .../java/org/apache/impala/util/IcebergUtil.java   | 43 ++--------
 .../impala/catalog/local/LocalCatalogTest.java     | 10 +--
 .../queries/QueryTest/iceberg-insert.test          |  1 +
 18 files changed, 269 insertions(+), 254 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index e49d2c99c..8aa1116c6 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -568,9 +568,9 @@ struct TIcebergTable {
   2: required list<TIcebergPartitionSpec> partition_spec
   3: required i32 default_partition_spec_id
   // Map from 128-bit Murmur3 hash of data file path to its file descriptor
-  4: optional map<string,THdfsFileDesc> path_hash_to_file_descriptor
-  // Iceberg snapshot id of the table
-  5: optional i64 snapshot_id
+  4: optional map<string, THdfsFileDesc> path_hash_to_file_descriptor
+  // Snapshot id of the org.apache.iceberg.Table object cached in the CatalogD
+  5: optional i64 catalog_snapshot_id;
   // Iceberg 'write.parquet.compression-codec' and 'write.parquet.compression-level' table
   // properties
   6: optional TCompressionCodec parquet_compression_codec
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 886c41be3..ba98df9d6 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -390,9 +390,8 @@ struct TTableInfoSelector {
   // it in cases the clients do need HMS partition structs.
   12: bool want_hms_partition
 
-  // The response should contain information about the Iceberg snapshot, i.e. the snapshot
-  // id and the file descriptors.
-  13: bool want_iceberg_snapshot
+  // The response should contain information about the Iceberg table.
+  13: bool want_iceberg_table
 }
 
 // Returned information about a particular partition.
@@ -442,11 +441,6 @@ struct TPartialPartitionInfo {
   13: optional CatalogObjects.THdfsPartitionLocation location
 }
 
-struct TIcebergSnapshot {
-  1: required i64 snapshot_id
-  2: optional map<string, CatalogObjects.THdfsFileDesc> iceberg_file_desc_map
-}
-
 // Returned information about a Table, as selected by TTableInfoSelector.
 struct TPartialTableInfo {
   1: optional hive_metastore.Table hms_table
@@ -487,8 +481,8 @@ struct TPartialTableInfo {
   // the description of how a prefix is computed.
   11: optional list<string> partition_prefixes
 
-  // Iceberg snapshot information
-  12: optional TIcebergSnapshot iceberg_snapshot
+  // Iceberg table information
+  12: optional CatalogObjects.TIcebergTable iceberg_table
 }
 
 struct TBriefTableMeta {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index c3ee3f13c..e28a6559a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -35,8 +35,6 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
 import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.LiteralExpr;
@@ -44,8 +42,6 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Reference;
 import org.apache.impala.compat.HdfsShim;
-import org.apache.impala.fb.FbFileDesc;
-import org.apache.impala.fb.FbIcebergMetadata;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.THdfsCompression;
@@ -54,7 +50,6 @@ import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
-import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TIcebergTable;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TResultSet;
@@ -89,6 +84,11 @@ public interface FeIcebergTable extends FeFsTable {
    */
   TIcebergCatalog getIcebergCatalog();
 
+  /**
+   * Returns the cached Iceberg Table object that stores the metadata loaded by Iceberg.
+   */
+  org.apache.iceberg.Table getIcebergApiTable();
+
   /**
    * Return Iceberg catalog location, we use this location to load metadata from Iceberg
    * When using 'hadoop.tables', this value equals to table location
@@ -146,7 +146,9 @@ public interface FeIcebergTable extends FeFsTable {
   /**
    * @return the Iceberg schema.
    */
-  Schema getIcebergSchema();
+  default Schema getIcebergSchema() {
+    return getIcebergApiTable().schema();
+  }
 
   @Override
   default boolean isCacheable() {
@@ -267,20 +269,14 @@ public interface FeIcebergTable extends FeFsTable {
   }
 
   /**
-   * Current snapshot id of the table.
+   * Returns the current snapshot id of the Iceberg API table if it exists, otherwise
+   * returns -1.
    */
-  long snapshotId();
-
-  /**
-   * Utility class to hold information about Iceberg snapshots.
-   */
-  public static class Snapshot {
-    public Snapshot(long snapshotId, Map<String, FileDescriptor> pathHashToFileDescMap) {
-      this.snapshotId = snapshotId;
-      this.pathHashToFileDescMap = pathHashToFileDescMap;
+  default long snapshotId() {
+    if (getIcebergApiTable() != null && getIcebergApiTable().currentSnapshot() != null) {
+      return getIcebergApiTable().currentSnapshot().snapshotId();
     }
-    public long snapshotId;
-    public Map<String, FileDescriptor> pathHashToFileDescMap;
+    return -1;
   }
 
   /**
@@ -308,10 +304,9 @@ public interface FeIcebergTable extends FeFsTable {
       resultSchema.addToColumns(new TColumn("Field Partition Transform",
           Type.STRING.toThrift()));
 
-      TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(table);
-      if (!metadata.specs().isEmpty()) {
+      if (!table.getIcebergApiTable().specs().isEmpty()) {
         // Just show the current PartitionSpec from Iceberg table metadata
-        PartitionSpec latestSpec = metadata.spec();
+        PartitionSpec latestSpec = table.getIcebergApiTable().spec();
         HashMap<String, Integer> transformParams =
             IcebergUtil.getPartitionTransformParams(latestSpec);
         for(PartitionField field : latestSpec.fields()) {
@@ -419,7 +414,7 @@ public interface FeIcebergTable extends FeFsTable {
       tIcebergTable.setPath_hash_to_file_descriptor(
           convertPathHashToFileDescMap(icebergTable));
 
-      tIcebergTable.setSnapshot_id(icebergTable.snapshotId());
+      tIcebergTable.setCatalog_snapshot_id(icebergTable.snapshotId());
       tIcebergTable.setParquet_compression_codec(
           icebergTable.getIcebergParquetCompressionCodec());
       tIcebergTable.setParquet_row_group_size(
@@ -465,13 +460,6 @@ public interface FeIcebergTable extends FeFsTable {
       return fileDescMap;
     }
 
-    public static TIcebergSnapshot createTIcebergSnapshot(FeIcebergTable icebergTable) {
-      TIcebergSnapshot snapshot = new TIcebergSnapshot();
-      snapshot.setSnapshot_id(icebergTable.snapshotId());
-      snapshot.setIceberg_file_desc_map(convertPathHashToFileDescMap(icebergTable));
-      return snapshot;
-    }
-
     /**
      * Get FileDescriptor by data file location
      */
@@ -521,8 +509,6 @@ public interface FeIcebergTable extends FeFsTable {
             hdfsFileDescMap.put(path.toUri().getPath(), fileDesc);
         }
       }
-      // TODO: remove Iceberg table load once IMPALA-10737 is resolved.
-      Table iceTbl = IcebergUtil.loadTable(table);
       Map<String, HdfsPartition.FileDescriptor> fileDescMap = new HashMap<>();
       List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(table,
           new ArrayList<>(), /*timeTravelSpecl=*/null);
@@ -533,7 +519,7 @@ public interface FeIcebergTable extends FeFsTable {
             HdfsPartition.FileDescriptor fsFd = hdfsFileDescMap.get(
                 path.toUri().getPath());
             HdfsPartition.FileDescriptor iceFd = fsFd.cloneWithFileMetadata(
-                IcebergUtil.createIcebergMetadata(table, iceTbl, dataFile));
+                IcebergUtil.createIcebergMetadata(table, dataFile));
             fileDescMap.put(pathHash, iceFd);
           } else {
             LOG.warn("Iceberg DataFile '{}' cannot be found in the HDFS recursive file "
@@ -542,7 +528,7 @@ public interface FeIcebergTable extends FeFsTable {
                 new Path(dataFile.path().toString()),
                 new Path(table.getIcebergTableLocation()), table.getHostIndex());
             HdfsPartition.FileDescriptor iceFd = fileDesc.cloneWithFileMetadata(
-                IcebergUtil.createIcebergMetadata(table, iceTbl, dataFile));
+                IcebergUtil.createIcebergMetadata(table, dataFile));
             fileDescMap.put(IcebergUtil.getDataFilePathHash(dataFile), iceFd);
           }
       }
@@ -553,9 +539,9 @@ public interface FeIcebergTable extends FeFsTable {
      * Get iceberg partition spec by iceberg table metadata
      */
     public static List<IcebergPartitionSpec> loadPartitionSpecByIceberg(
-        TableMetadata metadata) throws TableLoadingException {
+        FeIcebergTable table) throws TableLoadingException {
       List<IcebergPartitionSpec> ret = new ArrayList<>();
-      for (PartitionSpec spec : metadata.specs()) {
+      for (PartitionSpec spec : table.getIcebergApiTable().specs().values()) {
         ret.add(convertPartitionSpec(spec));
       }
       return ret;
@@ -579,7 +565,7 @@ public interface FeIcebergTable extends FeFsTable {
       List<IcebergPartitionSpec> specs = feIcebergTable.getPartitionSpecs();
       Preconditions.checkState(specs != null);
       if (specs.isEmpty()) return null;
-      int defaultSpecId = feIcebergTable.getDefaultPartitionSpecId();
+      int defaultSpecId = feIcebergTable.getIcebergApiTable().spec().specId();
       Preconditions.checkState(specs.size() > defaultSpecId);
       return specs.get(defaultSpecId);
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 42a6ae972..8f3cfbf2d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -27,9 +27,6 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.types.Types;
 import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionTransform;
@@ -39,7 +36,6 @@ import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsCompression;
-import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
@@ -156,16 +152,17 @@ public class IcebergTable extends Table implements FeIcebergTable {
   // last item of the list is the latest.
   private int defaultPartitionSpecId_;
 
-  // Schema of the iceberg table.
-  private Schema icebergSchema_;
-
   // Key is the DataFile path hash, value is FileDescriptor transformed from DataFile
   private Map<String, FileDescriptor> pathHashToFileDescMap_;
 
   // Treat iceberg table as a non-partitioned hdfs table in backend
   private HdfsTable hdfsTable_;
 
-  private long snapshotId_ = -1;
+  // Cached Iceberg API table object.
+  private org.apache.iceberg.Table icebergApiTable_;
+
+  // The snapshot id cached in the CatalogD, necessary to syncronize the caches.
+  private long catalogSnapshotId_ = -1;
 
   protected IcebergTable(org.apache.hadoop.hive.metastore.api.Table msTable,
       Db db, String name, String owner) {
@@ -203,6 +200,11 @@ public class IcebergTable extends Table implements FeIcebergTable {
     return hdfsTable_;
   }
 
+  @Override
+  public org.apache.iceberg.Table getIcebergApiTable() {
+    return icebergApiTable_;
+  }
+
   @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;
@@ -221,11 +223,6 @@ public class IcebergTable extends Table implements FeIcebergTable {
     return ICEBERG_STORAGE_HANDLER;
   }
 
-  @Override
-  public Schema getIcebergSchema() {
-    return icebergSchema_;
-  }
-
   public static boolean isIcebergStorageHandler(String handler) {
     return handler != null && handler.equals(ICEBERG_STORAGE_HANDLER);
   }
@@ -291,7 +288,9 @@ public class IcebergTable extends Table implements FeIcebergTable {
   }
 
   @Override
-  public int getDefaultPartitionSpecId() { return defaultPartitionSpecId_; }
+  public int getDefaultPartitionSpecId() {
+    return defaultPartitionSpecId_;
+  }
 
   @Override
   public Map<String, FileDescriptor> getPathHashToFileDescMap() {
@@ -300,7 +299,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
 
   @Override
   public long snapshotId() {
-    return snapshotId_;
+    return catalogSnapshotId_;
   }
 
   @Override
@@ -333,11 +332,9 @@ public class IcebergTable extends Table implements FeIcebergTable {
       final Timer.Context ctxStorageLdTime =
           getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
       try {
-        TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(this);
-        if (metadata.currentSnapshot() != null) {
-            snapshotId_ = metadata.currentSnapshot().snapshotId();
-        }
-        loadSchemaFromIceberg(metadata);
+        icebergApiTable_ = IcebergUtil.loadTable(this);
+        catalogSnapshotId_ = FeIcebergTable.super.snapshotId();
+        loadSchemaFromIceberg();
         // Loading hdfs table after loaded schema from Iceberg,
         // in case we create external Iceberg table skipping column info in sql.
         icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTbl);
@@ -350,8 +347,8 @@ public class IcebergTable extends Table implements FeIcebergTable {
         pathHashToFileDescMap_ = Utils.loadAllPartition(this);
         loadAllColumnStats(msClient);
       } catch (Exception e) {
-        throw new TableLoadingException("Error loading metadata for Iceberg table " +
-            icebergTableLocation_, e);
+        throw new IcebergTableLoadingException("Error loading metadata for Iceberg table "
+            + icebergTableLocation_, e);
       } finally {
         storageMetadataLoadTime_ = ctxStorageLdTime.stop();
       }
@@ -378,11 +375,10 @@ public class IcebergTable extends Table implements FeIcebergTable {
   /**
    * Load schema and partitioning schemes directly from Iceberg.
    */
-  public void loadSchemaFromIceberg(TableMetadata metadata) throws TableLoadingException {
-    icebergSchema_ = metadata.schema();
+  public void loadSchemaFromIceberg() throws TableLoadingException {
     loadSchema();
-    partitionSpecs_ = Utils.loadPartitionSpecByIceberg(metadata);
-    defaultPartitionSpecId_ = metadata.defaultSpecId();
+    partitionSpecs_ = Utils.loadPartitionSpecByIceberg(this);
+    defaultPartitionSpecId_ = icebergApiTable_.spec().specId();
   }
 
   /**
@@ -390,8 +386,9 @@ public class IcebergTable extends Table implements FeIcebergTable {
    */
   private void loadSchema() throws TableLoadingException {
     clearColumns();
-    msTable_.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(icebergSchema_));
-    for (Column col : IcebergSchemaConverter.convertToImpalaSchema(icebergSchema_)) {
+    msTable_.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(
+        getIcebergSchema()));
+    for (Column col : IcebergSchemaConverter.convertToImpalaSchema(getIcebergSchema())) {
       addColumn(col);
     }
   }
@@ -420,12 +417,13 @@ public class IcebergTable extends Table implements FeIcebergTable {
     defaultPartitionSpecId_ = ticeberg.getDefault_partition_spec_id();
     // Load file descriptors for the Iceberg snapshot. We are using the same host index,
     // so there's no need for translation.
+    catalogSnapshotId_ = ticeberg.catalog_snapshot_id;
+    // The Iceberg API table needs to be available and cached even when loaded through
+    // thrift.
+    icebergApiTable_ = IcebergUtil.loadTable(this);
     pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift(
         ticeberg.getPath_hash_to_file_descriptor(), null, null);
-    snapshotId_ = ticeberg.getSnapshot_id();
     hdfsTable_.loadFromThrift(thriftTable);
-    TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(this);
-    icebergSchema_ = metadata.schema();
   }
 
   private List<IcebergPartitionSpec> loadPartitionBySpecsFromThrift(
@@ -482,12 +480,12 @@ public class IcebergTable extends Table implements FeIcebergTable {
     Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos = new HashMap<>();
     TGetPartialCatalogObjectResponse resp =
         getHdfsTable().getPartialInfo(req, missingPartialInfos);
-    if (req.table_info_selector.want_iceberg_snapshot) {
-      resp.table_info.setIceberg_snapshot(
-          FeIcebergTable.Utils.createTIcebergSnapshot(this));
+    if (req.table_info_selector.want_iceberg_table) {
+      resp.table_info.setIceberg_table(Utils.getTIcebergTable(this));
       if (!resp.table_info.isSetNetwork_addresses()) {
         resp.table_info.setNetwork_addresses(getHostIndex().getList());
       }
+      resp.table_info.iceberg_table.setCatalog_snapshot_id(catalogSnapshotId_);
     }
     return resp;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTableLoadingException.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTableLoadingException.java
new file mode 100644
index 000000000..4b2d691b4
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTableLoadingException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+/**
+ * Thrown when the iceberg table metadata cannot be loaded due to an error.
+ */
+public class IcebergTableLoadingException extends TableLoadingException {
+
+  public IcebergTableLoadingException(String s, Throwable cause) {
+    super(s, cause);
+  }
+
+  public IcebergTableLoadingException(String s) {
+    super(s);
+  }
+};
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index e0c986f79..0adaaa4cd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -254,6 +254,11 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
     return -1;
   }
 
+  @Override
+  public org.apache.iceberg.Table getIcebergApiTable() {
+    return null;
+  }
+
   public void addColumn(IcebergColumn col) {
     colsByPos_.add(col);
     colsByName_.put(col.getName().toLowerCase(), col);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 7dff758ec..794124f44 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -54,7 +54,6 @@ import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogDeltaLog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogObjectCache;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -64,6 +63,7 @@ import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
 import org.apache.impala.catalog.Principal;
 import org.apache.impala.catalog.PrincipalPrivilege;
 import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
@@ -84,7 +84,6 @@ import org.apache.impala.thrift.TFunctionName;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsFileDesc;
-import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TPartialTableInfo;
@@ -96,6 +95,7 @@ import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -106,7 +106,6 @@ import org.ehcache.sizeof.SizeOf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -609,7 +608,7 @@ public class CatalogdMetaProvider implements MetaProvider {
             req.catalog_info_selector.want_db_names = true;
             TGetPartialCatalogObjectResponse resp = sendRequest(req);
             checkResponse(resp.catalog_info != null && resp.catalog_info.db_names != null,
-                req, "missing table names");
+                req, "missing database names");
             return ImmutableList.copyOf(resp.catalog_info.db_names);
           }
     });
@@ -1024,35 +1023,48 @@ public class CatalogdMetaProvider implements MetaProvider {
     return ret;
   }
 
-  /**
-   * Utility function for to retrieve table info with Iceberg snapshot. Exists for testing
-   * purposes, use loadIcebergSnapshot() which translates the file descriptors to the
-   * table's host index.
-   */
-  @VisibleForTesting
-  TPartialTableInfo loadTableInfoWithIcebergSnapshot(final TableMetaRef table)
-      throws TException {
+  @Override
+  public TPartialTableInfo loadIcebergTable(final TableMetaRef table) throws TException {
     Preconditions.checkArgument(table instanceof TableMetaRefImpl);
-    TGetPartialCatalogObjectRequest req = newReqForTable(table);
-    req.table_info_selector.want_iceberg_snapshot = true;
-    TGetPartialCatalogObjectResponse resp = sendRequest(req);
-    return resp.table_info;
+    TableMetaRefImpl tableRef = (TableMetaRefImpl)table;
+    String itemStr = "iceberg metadata for " + tableRef.dbName_ + "."
+        + tableRef.tableName_;
+    IcebergMetaCacheKey cacheKey = new IcebergMetaCacheKey(tableRef);
+    return loadWithCaching(itemStr, TABLE_METADATA_CACHE_CATEGORY, cacheKey,
+        new Callable<TPartialTableInfo>() {
+          @Override
+          public TPartialTableInfo call() throws Exception {
+            TGetPartialCatalogObjectRequest req = newReqForTable(table);
+            req.table_info_selector.want_iceberg_table = true;
+            TGetPartialCatalogObjectResponse resp = sendRequest(req);
+            checkResponse(resp.table_info != null &&
+                resp.table_info.iceberg_table != null, req,
+                "missing Iceberg table metadata");
+            return resp.getTable_info();
+          }
+    });
   }
 
   @Override
-  public FeIcebergTable.Snapshot loadIcebergSnapshot(final TableMetaRef table,
-      ListMap<TNetworkAddress> hostIndex)
-      throws TException {
-    TPartialTableInfo tableInfo = loadTableInfoWithIcebergSnapshot(table);
-    Map<String, FileDescriptor> pathToFds =
-        FeIcebergTable.Utils.loadFileDescMapFromThrift(
-            tableInfo.getIceberg_snapshot().getIceberg_file_desc_map(),
-            tableInfo.getNetwork_addresses(),
-            hostIndex);
-    return new FeIcebergTable.Snapshot(
-        tableInfo.getIceberg_snapshot().getSnapshot_id(),
-        pathToFds);
-  }
+  public org.apache.iceberg.Table loadIcebergApiTable(final TableMetaRef table,
+      TableParams params, Table msTable) throws TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
+    TableMetaRefImpl tableRef = (TableMetaRefImpl)table;
+    String itemStr = "iceberg api table for " + tableRef.dbName_ + "."
+        + tableRef.tableName_;
+    IcebergApiTableCacheKey cacheKey = new IcebergApiTableCacheKey(tableRef);
+    return loadWithCaching(itemStr, TABLE_METADATA_CACHE_CATEGORY, cacheKey,
+        new Callable<org.apache.iceberg.Table>() {
+          @Override
+          public org.apache.iceberg.Table call() throws Exception {
+            return IcebergUtil.loadTable(
+                params.getIcebergCatalog(),
+                IcebergUtil.getIcebergTableIdentifier(msTable),
+                params.getIcebergCatalogLocation(),
+                msTable.getParameters());
+          }
+        });
+    }
 
   private ImmutableList<FileDescriptor> convertThriftFdList(List<THdfsFileDesc> thriftFds,
       List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> hostIndex) {
@@ -1942,6 +1954,30 @@ public class CatalogdMetaProvider implements MetaProvider {
     }
   }
 
+  /**
+   * Cache key for an entry storing Iceberg table metadata.
+   *
+   * Values for these keys are 'TPartialTableInfo' objects.
+   */
+  private static class IcebergMetaCacheKey extends VersionedTableCacheKey {
+
+    public IcebergMetaCacheKey(TableMetaRefImpl table) {
+      super(table);
+    }
+  }
+
+  /**
+   * Cache key for an entry storing Iceberg API metadata.
+   *
+   * Values for these keys are 'org.apache.iceberg.Table' objects.
+   */
+  private static class IcebergApiTableCacheKey extends VersionedTableCacheKey {
+
+    public IcebergApiTableCacheKey(TableMetaRefImpl table) {
+      super(table);
+    }
+  }
+
   @VisibleForTesting
   static class SizeOfWeigher implements Weigher<Object, Object> {
     // Bypass flyweight objects like small boxed integers, Boolean.TRUE, enums, etc.
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 8b741fc77..f2479dced 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -19,17 +19,11 @@ package org.apache.impala.catalog.local;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
-import com.google.common.collect.Iterables;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -43,7 +37,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogException;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FileMetadataLoader;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCachePool;
@@ -53,13 +46,14 @@ import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.Pair;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TBriefTableMeta;
-import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialTableInfo;
 import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.ListMap;
@@ -541,10 +535,15 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   @Override
-  public FeIcebergTable.Snapshot loadIcebergSnapshot(final TableMetaRef table,
-      ListMap<TNetworkAddress> hostIndex)
-      throws TException {
+  public TPartialTableInfo loadIcebergTable(final TableMetaRef table) throws TException {
+    throw new NotImplementedException(
+        "loadIcebergTable() is not implemented for DirectMetaProvider");
+  }
+
+  @Override
+  public org.apache.iceberg.Table loadIcebergApiTable(final TableMetaRef table,
+      TableParams param, Table msTable) throws TException {
     throw new NotImplementedException(
-        "loadIcebergSnapshot() is not implemented for DirectMetaProvider");
+        "loadIcebergApiTable() is not implemented for DirectMetaProvider");
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 09bec8ff8..57a6268dc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.catalog.local;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -26,8 +25,6 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.TableMetadata;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.catalog.CatalogObject;
 import org.apache.impala.catalog.Column;
@@ -42,13 +39,11 @@ import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
-import org.apache.impala.thrift.TIcebergSnapshot;
-import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialTableInfo;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.IcebergSchemaConverter;
 import org.apache.impala.util.IcebergUtil;
-import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.Immutable;
@@ -70,29 +65,35 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   private int defaultPartitionSpecId_;
   private Map<String, FileDescriptor> pathHashToFileDescMap_;
   private LocalFsTable localFsTable_;
-  private long snapshotId_ = -1;
-  private Schema icebergSchema_;
 
-  static LocalTable loadFromIceberg(LocalDb db, Table msTable,
+  // The snapshot id of the current snapshot stored in the CatalogD.
+  long catalogSnapshotId_;
+
+  // Cached Iceberg API table object.
+  private org.apache.iceberg.Table icebergApiTable_;
+
+  /**
+   * Loads the Iceberg metadata from the CatalogD then initializes a LocalIcebergTable.
+   */
+  static LocalTable loadIcebergTableViaMetaProvider(LocalDb db, Table msTable,
       MetaProvider.TableMetaRef ref) throws TableLoadingException {
     Preconditions.checkNotNull(db);
     Preconditions.checkNotNull(msTable);
     try {
-      TableParams params = new TableParams(msTable);
-      TableMetadata metadata =
-          IcebergUtil.getIcebergTableMetadata(params.icebergCatalog_,
-              IcebergUtil.getIcebergTableIdentifier(msTable),
-              params.icebergCatalogLocation_,
-              msTable.getParameters());
-      List<Column> iceColumns =
-          IcebergSchemaConverter.convertToImpalaSchema(metadata.schema());
+      TableParams tableParams = new TableParams(msTable);
+      TPartialTableInfo tableInfo = db.getCatalog().getMetaProvider()
+          .loadIcebergTable(ref);
+      org.apache.iceberg.Table icebergApiTable = db.getCatalog().getMetaProvider()
+          .loadIcebergApiTable(ref, tableParams, msTable);
+      List<Column> iceColumns = IcebergSchemaConverter.convertToImpalaSchema(
+          icebergApiTable.schema());
       validateColumns(iceColumns, msTable.getSd().getCols());
       ColumnMap colMap = new ColumnMap(iceColumns,
           /*numClusteringCols=*/ 0,
           db.getName() + "." + msTable.getTableName(),
           /*isFullAcidSchema=*/false);
-
-      return new LocalIcebergTable(db, msTable, ref, colMap, metadata);
+      return new LocalIcebergTable(db, msTable, ref, colMap, tableInfo, tableParams,
+          icebergApiTable);
     } catch (Exception e) {
       String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
       throw new TableLoadingException(
@@ -101,25 +102,20 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   }
 
   private LocalIcebergTable(LocalDb db, Table msTable, MetaProvider.TableMetaRef ref,
-      ColumnMap cmap, TableMetadata metadata)
-      throws TableLoadingException {
+      ColumnMap cmap, TPartialTableInfo tableInfo, TableParams tableParams,
+      org.apache.iceberg.Table icebergApiTable) throws TableLoadingException {
     super(db, msTable, ref, cmap);
+    Preconditions.checkNotNull(tableInfo);
     localFsTable_ = LocalFsTable.load(db, msTable, ref);
-    tableParams_ = new TableParams(msTable);
-    FeIcebergTable.Snapshot snapshot;
-    try {
-      snapshot = db_.getCatalog().getMetaProvider().loadIcebergSnapshot(
-          ref, getHostIndex());
-      Preconditions.checkNotNull(snapshot);
-      snapshotId_ = snapshot.snapshotId;
-      pathHashToFileDescMap_ = snapshot.pathHashToFileDescMap;
-    } catch (TException e) {
-      throw new TableLoadingException(String.format(
-          "Failed to load table: %s.%s", msTable.getDbName(), msTable.getTableName()), e);
-    }
-    partitionSpecs_ = Utils.loadPartitionSpecByIceberg(metadata);
-    defaultPartitionSpecId_ = metadata.defaultSpecId();
-    icebergSchema_ = metadata.schema();
+    tableParams_ = tableParams;
+    pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift(
+        tableInfo.getIceberg_table().getPath_hash_to_file_descriptor(),
+        tableInfo.getNetwork_addresses(),
+        getHostIndex());
+    icebergApiTable_ = icebergApiTable;
+    catalogSnapshotId_ = tableInfo.getIceberg_table().getCatalog_snapshot_id();
+    partitionSpecs_ = Utils.loadPartitionSpecByIceberg(this);
+    defaultPartitionSpecId_ = tableInfo.getIceberg_table().getDefault_partition_spec_id();
     icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTable);
     icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTable);
     icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTable);
@@ -175,11 +171,6 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     return tableParams_.icebergCatalogLocation_;
   }
 
-  @Override
-  public Schema getIcebergSchema() {
-    return icebergSchema_;
-  }
-
   @Override
   public FeFsTable getFeFsTable() {
     return localFsTable_;
@@ -191,7 +182,9 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   }
 
   @Override
-  public int getDefaultPartitionSpecId() { return defaultPartitionSpecId_; }
+  public int getDefaultPartitionSpecId() {
+    return defaultPartitionSpecId_;
+  }
 
   @Override
   public IcebergPartitionSpec getDefaultPartitionSpec() {
@@ -203,9 +196,14 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     return pathHashToFileDescMap_;
   }
 
+  @Override
+  public org.apache.iceberg.Table getIcebergApiTable() {
+    return icebergApiTable_;
+  }
+
   @Override
   public long snapshotId() {
-    return snapshotId_;
+    return catalogSnapshotId_;
   }
 
   @Override
@@ -215,7 +213,6 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
         FeCatalogUtils.getTColumnDescriptors(this),
         getNumClusteringCols(),
         name_, db_.getName());
-
     desc.setIcebergTable(Utils.getTIcebergTable(this));
     desc.setHdfsTable(transfromToTHdfsTable());
     return desc;
@@ -244,7 +241,7 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   }
 
   @Immutable
-  private static class TableParams {
+  public static class TableParams {
     private final String icebergTableLocation_;
     private final TIcebergCatalog icebergCatalog_;
     private final String icebergCatalogLocation_;
@@ -265,5 +262,17 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
         icebergCatalogLocation_ = icebergTableLocation_;
       }
     }
+
+    public String getIcebergTableLocation() {
+      return icebergTableLocation_;
+    }
+
+    public TIcebergCatalog getIcebergCatalog() {
+      return icebergCatalog_;
+    }
+
+    public String getIcebergCatalogLocation() {
+      return icebergCatalogLocation_;
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index f09b65696..f056c2be8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -104,7 +104,7 @@ abstract class LocalTable implements FeTable {
     } else if (KuduTable.isKuduTable(msTbl)) {
       t = LocalKuduTable.loadFromKudu(db, msTbl, ref);
     } else if (IcebergTable.isIcebergTable(msTbl)) {
-      t = LocalIcebergTable.loadFromIceberg(db, msTbl, ref);
+      t = LocalIcebergTable.loadIcebergTableViaMetaProvider(db, msTbl, ref);
     } else if (DataSourceTable.isDataSourceTable(msTbl)) {
       // TODO(todd) support datasource table
     } else if (HdfsFileFormat.isHdfsInputFormatClass(
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 524871dc8..3e3840df6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -28,17 +28,17 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogException;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsPartitionLocationCompressor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TBriefTableMeta;
-import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialTableInfo;
 import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.ListMap;
 import org.apache.thrift.TException;
@@ -126,11 +126,16 @@ public interface MetaProvider {
       List<String> colNames) throws TException;
 
   /**
-   * Loads Iceberg snapshot information, i.e. snapshot id and file descriptors.
+   * Loads Iceberg related table metadata.
    */
-  public FeIcebergTable.Snapshot loadIcebergSnapshot(final TableMetaRef table,
-      ListMap<TNetworkAddress> hostIndex)
-      throws TException;
+  public TPartialTableInfo loadIcebergTable(
+      final TableMetaRef table) throws TException;
+
+  /**
+   * Loads the Iceberg API table metadata through the Iceberg library.
+   */
+  public org.apache.iceberg.Table loadIcebergApiTable(
+      final TableMetaRef table, TableParams param, Table msTable) throws TException;
 
   /**
    * Reference to a table as returned by loadTable(). This reference must be passed
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index d13a0b778..ca5a670be 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -114,7 +114,6 @@ public class IcebergScanNode extends HdfsScanNode {
     }
     long dataFilesCacheMisses = 0;
     List<FileDescriptor> fileDescList = new ArrayList<>();
-    org.apache.iceberg.Table iceTbl = null;
     for (DataFile dataFile : dataFileList) {
       FileDescriptor fileDesc = icebergTable_.getPathHashToFileDescMap()
           .get(IcebergUtil.getDataFilePathHash(dataFile));
@@ -139,18 +138,8 @@ public class IcebergScanNode extends HdfsScanNode {
               "Cannot load file descriptor for: " + dataFile.path());
         }
         // Add file descriptor to the cache.
-        try {
-          if (iceTbl == null) {
-            // TODO: remove Iceberg table load once IMPALA-10737 is resolved.
-            iceTbl = IcebergUtil.loadTable(icebergTable_);
-          }
-          fileDesc = fileDesc.cloneWithFileMetadata(
-              IcebergUtil.createIcebergMetadata(icebergTable_, iceTbl, dataFile));
-        } catch (TableLoadingException e) {
-          // TODO: get rid of try-catch TableLoadingException once we have IMPALA-10737.
-          throw new ImpalaRuntimeException(String.format(
-              "Failed to load Iceberg table: %s", icebergTable_.getFullName()), e);
-        }
+        fileDesc = fileDesc.cloneWithFileMetadata(
+              IcebergUtil.createIcebergMetadata(icebergTable_, dataFile));
         icebergTable_.getPathHashToFileDescMap().put(
             IcebergUtil.getDataFilePathHash(dataFile), fileDesc);
       }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4d6a59a96..e3b24dba4 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -102,6 +102,7 @@ import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FileMetadataLoadOpts;
 import org.apache.impala.catalog.IcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
@@ -6061,8 +6062,13 @@ public class CatalogOpExecutor {
               //     ACID tables, there is a Jira to cover this: HIVE-22062.
               //   2: If no need for a full table reload then fetch partition level
               //     writeIds and reload only the ones that changed.
-              updatedThriftTable = catalog_.reloadTable(tbl, req, resultType, cmdString,
-                  -1);
+              try {
+                updatedThriftTable = catalog_.reloadTable(tbl, req, resultType, cmdString,
+                    -1);
+              } catch (IcebergTableLoadingException e) {
+                updatedThriftTable = catalog_.invalidateTable(
+                    req.getTable_name(), tblWasRemoved, dbWasAdded);
+              }
             }
           } else {
             // Table was loaded from scratch, so it's already "refreshed".
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 2992ceeb3..1e47cc8b2 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.TableMetadata;
 import org.apache.impala.analysis.AlterDbStmt;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -1228,21 +1227,19 @@ public class Frontend {
     FeTable feTable = getCatalog().getTable(params.getTable_name().db_name,
         params.getTable_name().table_name);
     FeIcebergTable feIcebergTable = (FeIcebergTable) feTable;
-    TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(feIcebergTable);
-    Table table = IcebergUtil.loadTable(feIcebergTable);
+    Table table = feIcebergTable.getIcebergApiTable();
     Set<Long> ancestorIds = Sets.newHashSet(IcebergUtil.currentAncestorIds(table));
     TGetTableHistoryResult historyResult = new TGetTableHistoryResult();
 
-    List<HistoryEntry> filteredHistoryEntries =
-        metadata.snapshotLog().stream().collect(Collectors.toList());
+    List<HistoryEntry> filteredHistoryEntries = table.history();
     if (params.isSetFrom_time()) {
       // DESCRIBE HISTORY <table> FROM <ts>
-      filteredHistoryEntries = metadata.snapshotLog().stream()
+      filteredHistoryEntries = table.history().stream()
           .filter(c -> c.timestampMillis() >= params.from_time)
           .collect(Collectors.toList());
     } else if (params.isSetBetween_start_time() && params.isSetBetween_end_time()) {
       // DESCRIBE HISTORY <table> BETWEEN <ts> AND <ts>
-      filteredHistoryEntries = metadata.snapshotLog().stream()
+      filteredHistoryEntries = table.history().stream()
           .filter(x -> x.timestampMillis() >= params.between_start_time &&
               x.timestampMillis() <= params.between_end_time)
           .collect(Collectors.toList());
@@ -1254,7 +1251,7 @@ public class Frontend {
       long snapshotId = historyEntry.snapshotId();
       resultItem.setCreation_time(historyEntry.timestampMillis());
       resultItem.setSnapshot_id(snapshotId);
-      Snapshot snapshot = metadata.snapshot(snapshotId);
+      Snapshot snapshot = table.snapshot(snapshotId);
       if (snapshot != null && snapshot.parentId() != null) {
         resultItem.setParent_id(snapshot.parentId());
       }
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 5de7ef481..f577d96f7 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.impala.service;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,15 +39,11 @@ import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.TableNotFoundException;
-import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergColumnStats;
@@ -94,9 +89,7 @@ public class IcebergCatalogOpExecutor {
   public static void populateExternalTableCols(
       org.apache.hadoop.hive.metastore.api.Table msTbl, Table iceTbl)
       throws TableLoadingException {
-    TableMetadata metadata = ((BaseTable)iceTbl).operations().current();
-    Schema schema = metadata.schema();
-    msTbl.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(schema));
+    msTbl.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(iceTbl.schema()));
   }
 
   /**
@@ -166,10 +159,9 @@ public class IcebergCatalogOpExecutor {
   public static void alterTableSetPartitionSpec(FeIcebergTable feTable,
       TIcebergPartitionSpec partSpec, String catalogServiceId, long catalogVersion)
       throws TableLoadingException, ImpalaRuntimeException {
-    BaseTable iceTable = (BaseTable)IcebergUtil.loadTable(feTable);
+    BaseTable iceTable = (BaseTable)feTable.getIcebergApiTable();
     TableOperations tableOp = iceTable.operations();
     TableMetadata metadata = tableOp.current();
-
     Schema schema = metadata.schema();
     PartitionSpec newPartSpec = IcebergUtil.createIcebergPartition(schema, partSpec);
     TableMetadata newMetadata = metadata.updatePartitionSpec(newPartSpec);
@@ -280,8 +272,7 @@ public class IcebergCatalogOpExecutor {
   public static void appendFiles(FeIcebergTable feIcebergTable, Transaction txn,
       TIcebergOperationParam icebergOp) throws ImpalaRuntimeException,
       TableLoadingException {
-    org.apache.iceberg.Table nativeIcebergTable =
-        IcebergUtil.loadTable(feIcebergTable);
+    org.apache.iceberg.Table nativeIcebergTable = feIcebergTable.getIcebergApiTable();
     List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb();
     BatchWrite batchWrite;
     if (icebergOp.isIs_overwrite()) {
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 0d2aa5b5b..94888fc7e 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -47,7 +47,6 @@ import org.apache.impala.fb.FbIcebergDataFileFormat;
 import org.apache.impala.fb.FbIcebergMetadata;
 import org.apache.impala.fb.FbIcebergPartitionTransformValue;
 import org.apache.impala.fb.FbIcebergTransformType;
-import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.DataFile;
@@ -61,10 +60,8 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.transforms.PartitionSpecVisitor;
-import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -76,7 +73,6 @@ import org.apache.impala.analysis.TimeTravelSpec.Kind;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog;
@@ -86,14 +82,12 @@ import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.catalog.iceberg.IcebergCatalogs;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCompressionCodec;
-import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsCompression;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionField;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
-import org.apache.impala.thrift.TIcebergPartitionTransform;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
 
 public class IcebergUtil {
@@ -149,27 +143,6 @@ public class IcebergUtil {
     }
   }
 
-  /**
-   * Get TableMetadata by FeIcebergTable
-   */
-  public static TableMetadata getIcebergTableMetadata(FeIcebergTable table)
-      throws TableLoadingException {
-    BaseTable iceTable = (BaseTable)IcebergUtil.loadTable(table);
-    return iceTable.operations().current();
-  }
-
-  /**
-   * Get TableMetadata by related info tableName is table full name, usually
-   * database.table
-   */
-  public static TableMetadata getIcebergTableMetadata(TIcebergCatalog catalog,
-      TableIdentifier tableId, String location, Map<String, String> tableProps)
-      throws TableLoadingException {
-    BaseTable baseTable = (BaseTable)IcebergUtil.loadTable(catalog,
-        tableId, location, tableProps);
-    return baseTable.operations().current();
-  }
-
   /**
    * Get Iceberg table identifier by table property
    */
@@ -201,7 +174,7 @@ public class IcebergUtil {
    */
   public static Transaction getIcebergTransaction(FeIcebergTable feTable)
       throws TableLoadingException, ImpalaRuntimeException {
-    return getIcebergCatalog(feTable).loadTable(feTable).newTransaction();
+    return feTable.getIcebergApiTable().newTransaction();
   }
 
   /**
@@ -564,8 +537,7 @@ public class IcebergUtil {
 
   private static TableScan createScanAsOf(FeIcebergTable table,
       TimeTravelSpec timeTravelSpec) throws TableLoadingException {
-    BaseTable baseTable = (BaseTable)IcebergUtil.loadTable(table);
-    TableScan scan = baseTable.newScan();
+    TableScan scan = table.getIcebergApiTable().newScan();
     if (timeTravelSpec == null) {
       scan = scan.useSnapshot(table.snapshotId());
     } else {
@@ -865,10 +837,9 @@ public class IcebergUtil {
    * It creates a flatbuffer so it can be passed between machines and processes without
    * further de/serialization.
    */
-  public static FbFileMetadata createIcebergMetadata(FeIcebergTable feTbl,
-      Table iceTbl, DataFile df) throws TableLoadingException {
+  public static FbFileMetadata createIcebergMetadata(FeIcebergTable feTbl, DataFile df) {
     FlatBufferBuilder fbb = new FlatBufferBuilder(1);
-    int iceOffset = createIcebergMetadata(feTbl, iceTbl, fbb, df);
+    int iceOffset = createIcebergMetadata(feTbl, fbb, df);
     fbb.finish(FbFileMetadata.createFbFileMetadata(fbb, iceOffset));
     ByteBuffer bb = fbb.dataBuffer().slice();
     ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity());
@@ -876,10 +847,10 @@ public class IcebergUtil {
     return FbFileMetadata.getRootAsFbFileMetadata((ByteBuffer)compressedBb.flip());
   }
 
-  private static int createIcebergMetadata(FeIcebergTable feTbl, Table iceTbl,
-      FlatBufferBuilder fbb, DataFile df) throws TableLoadingException {
+  private static int createIcebergMetadata(FeIcebergTable feTbl, FlatBufferBuilder fbb,
+      DataFile df) {
     int partKeysOffset = -1;
-    PartitionSpec spec = iceTbl.specs().get(df.specId());
+    PartitionSpec spec = feTbl.getIcebergApiTable().specs().get(df.specId());
     if (spec != null && !spec.fields().isEmpty()) {
       partKeysOffset = createPartitionKeys(feTbl, fbb, spec, df);
     }
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 61721e4d4..824d9b5b4 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
-import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ToSqlUtils;
@@ -41,19 +40,17 @@ import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.service.Frontend;
-import org.apache.impala.service.MetadataOp;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TMetadataOpcode;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartialTableInfo;
-import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.MetaStoreUtil;
@@ -61,7 +58,6 @@ import org.apache.impala.util.PatternMatcher;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -264,12 +260,12 @@ public class LocalCatalogTest {
     LocalIcebergTable t = (LocalIcebergTable)catalog_.getTable(
         "functional_parquet", "iceberg_partitioned");
     Map<String, FileDescriptor> localTblFdMap = t.getPathHashToFileDescMap();
-    TPartialTableInfo tblInfo = provider_.loadTableInfoWithIcebergSnapshot(t.ref_);
+    TPartialTableInfo tblInfo = provider_.loadIcebergTable(t.ref_);
     ListMap<TNetworkAddress> catalogdHostIndexes = new ListMap<>();
     catalogdHostIndexes.populate(tblInfo.getNetwork_addresses());
     Map<String, FileDescriptor> catalogFdMap =
         FeIcebergTable.Utils.loadFileDescMapFromThrift(
-            tblInfo.getIceberg_snapshot().getIceberg_file_desc_map(),
+            tblInfo.getIceberg_table().getPath_hash_to_file_descriptor(),
             null, null);
     for (Map.Entry<String, FileDescriptor> entry : localTblFdMap.entrySet()) {
       String path = entry.getKey();
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
index a05b440b7..bffbd97d3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
@@ -239,6 +239,7 @@ drop table iceberg_hive_cat
 ====
 ---- QUERY
 # The data has been purged, so querying the external table fails.
+refresh iceberg_hive_cat_ext_2;
 select * from iceberg_hive_cat_ext_2
 ---- CATCH
 Table does not exist