You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/10/16 09:02:36 UTC

[impala] 01/03: IMPALA-10152: part 1: refactor Iceberg catalog handling

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 05ad4ce05fca2f3bcd01946836c657c5479adca6
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Oct 9 19:12:58 2020 +0200

    IMPALA-10152: part 1: refactor Iceberg catalog handling
    
    This patch refactors the code a bit to make it easier in the future
    to add support for new Iceberg catalogs. We plan to add support for
    HiveCatalog in the near future.
    
    Iceberg has two main interfaces to manage tables: Tables and Catalog.
    I created a new interface in Impala called 'IcebergCatalog' that
    abstracts both Tables and Catalog. Currently there are two
    implementations for IcebergCatalog:
    * HadoopTablesCatalog for HadoopTables
    * HadoopCatalog for HadoopCatalog
    
    This patch also delegates dropTable() to the Iceberg catalogs. Until
    this patch we let HMS drop the tables and delete the directories. It
    worked fine with the filesystem-based catalogs, but might not work well
    with other Iceberg catalogs like HiveCatalog.
    
    Change-Id: Ie69dff6cd6b8b3dc0ba5f7671b8504a936032a85
    Reviewed-on: http://gerrit.cloudera.org:8080/16575
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/CreateTableStmt.java    |   2 +-
 .../org/apache/impala/analysis/ToSqlUtils.java     |   2 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |   2 +-
 .../org/apache/impala/catalog/IcebergTable.java    |  11 ++-
 .../impala/catalog/iceberg/IcebergCatalog.java     |  65 +++++++++++++
 .../catalog/iceberg/IcebergHadoopCatalog.java      |  84 +++++++++++++++++
 .../catalog/iceberg/IcebergHadoopTables.java       |  96 +++++++++++++++++++
 .../impala/catalog/local/LocalIcebergTable.java    |   6 +-
 .../org/apache/impala/planner/IcebergScanNode.java |  11 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |  22 ++++-
 .../impala/service/IcebergCatalogOpExecutor.java   |  54 +++++------
 .../java/org/apache/impala/util/IcebergUtil.java   | 105 +++++++++++----------
 tests/query_test/test_iceberg.py                   |  14 ++-
 13 files changed, 378 insertions(+), 96 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index ff6df71..10ec916 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -608,7 +608,7 @@ public class CreateTableStmt extends StatementBase {
 
     // Some constraints for Iceberg table with 'hadoop.catalog'
     if (catalog == null || catalog.isEmpty() ||
-        IcebergUtil.getIcebergCatalog(catalog) == TIcebergCatalog.HADOOP_CATALOG) {
+        IcebergUtil.getTIcebergCatalog(catalog) == TIcebergCatalog.HADOOP_CATALOG) {
       // Table location cannot be set in SQL when using 'hadoop.catalog'
       if (getLocation() != null) {
         throw new AnalysisException(String.format("Location cannot be set for Iceberg " +
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 6e33f67..1919a33 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -552,7 +552,7 @@ public class ToSqlUtils {
     // Iceberg table with 'hadoop.catalog' do not display table LOCATION when using
     // 'show create table', user can use 'describe formatted/extended' to get location
     TIcebergCatalog icebergCatalog =
-        IcebergUtil.getIcebergCatalog(tblProperties.get(IcebergTable.ICEBERG_CATALOG));
+        IcebergUtil.getTIcebergCatalog(tblProperties.get(IcebergTable.ICEBERG_CATALOG));
     boolean isHadoopCatalog = fileFormat == HdfsFileFormat.ICEBERG &&
         icebergCatalog == TIcebergCatalog.HADOOP_CATALOG;
     if (location != null && !isHadoopCatalog) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index dd7c05e..1be3232 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -348,7 +348,7 @@ public interface FeIcebergTable extends FeFsTable {
      * Get all FileDescriptor from iceberg table without any predicates.
      */
     public static Map<String, HdfsPartition.FileDescriptor> loadAllPartition(
-        FeIcebergTable table) throws IOException {
+        FeIcebergTable table) throws IOException, TableLoadingException {
       // Empty predicates
       List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(table,
           new ArrayList<>());
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index b0f384a..e134d22 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -110,16 +110,17 @@ public class IcebergTable extends Table implements FeIcebergTable {
       Db db, String name, String owner) {
     super(msTable, db, name, owner);
     icebergTableLocation_ = msTable.getSd().getLocation();
-    icebergCatalog_ = IcebergUtil.getIcebergCatalog(msTable);
+    icebergCatalog_ = IcebergUtil.getTIcebergCatalog(msTable);
     icebergFileFormat_ = Utils.getIcebergFileFormat(msTable);
     hdfsTable_ = new HdfsTable(msTable, db, name, owner);
   }
 
   /**
-   * If managed table or external purge table , we create table by iceberg api,
-   * or we just create hms table.
+   * A table is synchronized table if its Managed table or if its a external table with
+   * <code>external.table.purge</code> property set to true.
+   * We need to create/drop/etc. synchronized tables through the Iceberg APIs as well.
    */
-  public static boolean needsCreateInIceberg(
+  public static boolean isSynchronizedTable(
       org.apache.hadoop.hive.metastore.api.Table msTbl) {
     Preconditions.checkState(isIcebergTable(msTbl));
     return isManagedTable(msTbl) || isExternalPurgeTable(msTbl);
@@ -128,7 +129,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
   /**
    * Returns if this metastore table has managed table type
    */
-  private static boolean isManagedTable(
+  public static boolean isManagedTable(
       org.apache.hadoop.hive.metastore.api.Table msTbl) {
     return msTbl.getTableType().equalsIgnoreCase(TableType.MANAGED_TABLE.toString());
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
new file mode 100644
index 0000000..d12a123
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.TableLoadingException;
+
+/**
+ * Interface for Iceberg catalogs. Only contains a minimal set of methods to make
+ * it easy to add support for new Iceberg catalogs. Methods that can be implemented in a
+ * catalog-agnostic way should be placed in IcebergUtil.
+ */
+public interface IcebergCatalog {
+  /**
+   * Creates an Iceberg table in this catalog.
+   */
+  Table createTable(
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec,
+      String location,
+      Map<String, String> properties);
+
+  /**
+   * Loads a native Iceberg table based on the information in 'feTable'.
+   */
+  Table loadTable(FeIcebergTable feTable) throws TableLoadingException;
+
+  /**
+   * Loads a native Iceberg table based on 'tableId' or 'tableLocation'.
+   * @param tableId is the Iceberg table identifier to load the table via the catalog
+   *     interface, e.g. HadoopCatalog.
+   * @param tableLocation is the filesystem path to load the table via the HadoopTables
+   *     interface.
+   */
+   Table loadTable(TableIdentifier tableId, String tableLocation)
+      throws TableLoadingException;
+
+  /**
+   * Drops the table from this catalog.
+   * If purge is true, delete all data and metadata files in the table.
+   * Return true if the table was dropped, false if the table did not exist
+   */
+  boolean dropTable(FeIcebergTable feTable, boolean purge);
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
new file mode 100644
index 0000000..0134e6c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.thrift.TIcebergCatalog;
+import org.apache.impala.util.IcebergUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of IcebergCatalog for tables stored in HadoopCatalog.
+ */
+public class IcebergHadoopCatalog implements IcebergCatalog {
+  private HadoopCatalog hadoopCatalog;
+
+  public IcebergHadoopCatalog(String catalogLocation) {
+    hadoopCatalog = new HadoopCatalog(FileSystemUtil.getConfiguration(), catalogLocation);
+  }
+
+  @Override
+  public Table createTable(
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec,
+      String location,
+      Map<String, String> properties) {
+    // We pass null as 'location' to let Iceberg decide the table location.
+    return hadoopCatalog.createTable(identifier, schema, spec, /*location=*/null,
+        properties);
+  }
+
+  @Override
+  public Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
+    Preconditions.checkState(
+      feTable.getIcebergCatalog() == TIcebergCatalog.HADOOP_CATALOG);
+    TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
+    return loadTable(tableId, null);
+  }
+
+  @Override
+  public Table loadTable(TableIdentifier tableId, String tableLocation)
+      throws TableLoadingException {
+    Preconditions.checkState(tableId != null);
+    try {
+      return hadoopCatalog.loadTable(tableId);
+    } catch (NoSuchTableException e) {
+      throw new TableLoadingException(String.format(
+          "Failed to load Iceberg table with id: %s", tableId), e);
+    }
+  }
+
+  @Override
+  public boolean dropTable(FeIcebergTable feTable, boolean purge) {
+    Preconditions.checkState(
+      feTable.getIcebergCatalog() == TIcebergCatalog.HADOOP_CATALOG);
+    TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
+    return hadoopCatalog.dropTable(tableId, purge);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
new file mode 100644
index 0000000..9ff6481
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.thrift.TIcebergCatalog;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of IcebergCatalog for tables stored in HadoopTables.
+ */
+public class IcebergHadoopTables implements IcebergCatalog {
+  private static IcebergHadoopTables instance_;
+
+  public synchronized static IcebergHadoopTables getInstance() {
+    if (instance_ == null) {
+      instance_ = new IcebergHadoopTables();
+    }
+    return instance_;
+  }
+
+  private HadoopTables hadoopTables;
+
+  private IcebergHadoopTables() {
+    hadoopTables = new HadoopTables(FileSystemUtil.getConfiguration());
+  }
+
+  @Override
+  public Table createTable(
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec,
+      String location,
+      Map<String, String> properties) {
+    return hadoopTables.create(schema, spec, properties, location);
+  }
+
+  @Override
+  public Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
+    Preconditions.checkState(
+        feTable.getIcebergCatalog() == TIcebergCatalog.HADOOP_TABLES);
+    return loadTable(null, feTable.getLocation());
+  }
+
+  @Override
+  public Table loadTable(TableIdentifier tableId, String tableLocation)
+      throws TableLoadingException {
+    Preconditions.checkState(tableLocation != null);
+    try {
+      return hadoopTables.load(tableLocation);
+    } catch (NoSuchTableException e) {
+      throw new TableLoadingException(String.format(
+          "Failed to load Iceberg table at location: %s", tableLocation), e);
+    }
+  }
+
+  @Override
+  public boolean dropTable(FeIcebergTable feTable, boolean purge) {
+    Preconditions.checkState(
+      feTable.getIcebergCatalog() == TIcebergCatalog.HADOOP_TABLES);
+    if (purge) {
+      // TODO: HadoopTables doesn't have dropTable() in the Iceberg version being used.
+      // Un-comment below line when our Iceberg version is newer than 0.9.1 and has the
+      // following commit:
+      // https://github.com/apache/iceberg/commit/66a37c2793392e6ce9d5d2783b64488527f079fc
+      //
+      // return hadoopTables.dropTable(feTable.getLocation());
+    }
+    return true;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index fa7e1f0..d121567 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -62,9 +62,9 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     Preconditions.checkNotNull(msTable);
     try {
       TableParams params = new TableParams(msTable);
-      String tableName = IcebergUtil.getIcebergTableIdentifier(msTable);
       TableMetadata metadata =
-          IcebergUtil.getIcebergTableMetadata(params.icebergCatalog_, tableName,
+          IcebergUtil.getIcebergTableMetadata(params.icebergCatalog_,
+              IcebergUtil.getIcebergTableIdentifier(msTable),
               params.icebergCatalogLocation_);
 
       return new LocalIcebergTable(db, msTable, ref, ColumnMap.fromMsTable(msTable),
@@ -191,7 +191,7 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
         throw new LocalCatalogException("Cannot find iceberg table location for table "
             + fullTableName);
       }
-      icebergCatalog_ = IcebergUtil.getIcebergCatalog(msTable);
+      icebergCatalog_ = IcebergUtil.getTIcebergCatalog(msTable);
 
       if (icebergCatalog_ == TIcebergCatalog.HADOOP_CATALOG) {
         icebergCatalogLocation_ = Utils.getIcebergCatalogLocation(msTable);
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index ec1e9f3..96c6fe7 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -42,6 +42,7 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -89,8 +90,14 @@ public class IcebergScanNode extends HdfsScanNode {
    */
   public List<FileDescriptor> getFileDescriptorByIcebergPredicates()
       throws ImpalaRuntimeException{
-    List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(icebergTable_,
-        icebergPredicates_);
+    List<DataFile> dataFileList;
+    try {
+      dataFileList = IcebergUtil.getIcebergDataFiles(icebergTable_, icebergPredicates_);
+    } catch (TableLoadingException e) {
+      throw new ImpalaRuntimeException(String.format(
+          "Failed to load data files for Iceberg table: %s", icebergTable_.getFullName()),
+          e);
+    }
 
     List<FileDescriptor> fileDescList = new ArrayList<>();
     for (DataFile dataFile : dataFileList) {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ab71bf8..7f5aae2 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.impala.analysis.AlterTableSortByStmt;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.TableName;
@@ -85,6 +86,7 @@ import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.FeCatalogUtils;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
@@ -1898,6 +1900,14 @@ public class CatalogOpExecutor {
         KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true);
       }
 
+      if (msTbl != null &&
+          !(existingTbl instanceof IncompleteTable) &&
+          IcebergTable.isIcebergTable(msTbl) &&
+          IcebergTable.isSynchronizedTable(msTbl)) {
+        Preconditions.checkState(existingTbl instanceof IcebergTable);
+        IcebergCatalogOpExecutor.dropTable((IcebergTable)existingTbl, params.if_exists);
+      }
+
       // Check to make sure we don't drop a view with "drop table" statement and
       // vice versa. is_table field is marked optional in TDropTableOrViewParams to
       // maintain catalog api compatibility.
@@ -2616,10 +2626,10 @@ public class CatalogOpExecutor {
               msClient.getHiveClient().tableExists(newTable.getDbName(),
                   newTable.getTableName());
           if (!tableInMetastore) {
-            TIcebergCatalog catalog = IcebergUtil.getIcebergCatalog(newTable);
+            TIcebergCatalog catalog = IcebergUtil.getTIcebergCatalog(newTable);
             String location = newTable.getSd().getLocation();
             //Create table in iceberg if necessary
-            if (IcebergTable.needsCreateInIceberg(newTable)) {
+            if (IcebergTable.isSynchronizedTable(newTable)) {
               //Set location here if not been specified in sql
               if (location == null) {
                 if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
@@ -2635,17 +2645,19 @@ public class CatalogOpExecutor {
                 }
               }
               String tableLoc = IcebergCatalogOpExecutor.createTable(catalog,
-                  IcebergUtil.getIcebergTableIdentifier(newTable), location, params);
+                  IcebergUtil.getIcebergTableIdentifier(newTable), location, params)
+                  .location();
               newTable.getSd().setLocation(tableLoc);
             } else {
               if (location == null) {
                 if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
                   // When creating external Iceberg table with 'hadoop.catalog'
                   // We use catalog location and table identifier as location
-                  String identifier = IcebergUtil.getIcebergTableIdentifier(newTable);
+                  TableIdentifier identifier =
+                      IcebergUtil.getIcebergTableIdentifier(newTable);
                   newTable.getSd().setLocation(String.format("%s/%s/%s",
                       IcebergUtil.getIcebergCatalogLocation(newTable),
-                      identifier.split("\\.")[0], identifier.split("\\.")[1]));
+                      identifier.namespace().level(0), identifier.name()));
                 } else {
                   addSummary(response,
                       "Location is necessary for external iceberg table.");
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 6cac985..f0f0a1e 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -20,19 +20,21 @@ package org.apache.impala.service;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.types.Types;
 import org.apache.impala.catalog.ArrayType;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCreateTableParams;
@@ -56,39 +58,35 @@ public class IcebergCatalogOpExecutor {
    * Create Iceberg table by Iceberg api
    * Return value is table location from Iceberg
    */
-  public static String createTable(TIcebergCatalog catalog, String identifier,
+  public static Table createTable(TIcebergCatalog catalog, TableIdentifier identifier,
       String location, TCreateTableParams params) throws ImpalaRuntimeException {
     // Each table id increase from zero
     iThreadLocal.set(0);
     Schema schema = createIcebergSchema(params);
     PartitionSpec spec = IcebergUtil.createIcebergPartition(schema, params);
-    String tableLoc = null;
-    if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
-      tableLoc = createTableByHadoopCatalog(location, schema, spec, identifier);
-    } else {
-      Preconditions.checkArgument(catalog == TIcebergCatalog.HADOOP_TABLES);
-      tableLoc = createTableByHadoopTables(location, schema, spec);
-    }
+    IcebergCatalog icebergCatalog = IcebergUtil.getIcebergCatalog(catalog, location);
+    Table iceTable = icebergCatalog.createTable(identifier, schema, spec, location, null);
     LOG.info("Create iceberg table successful.");
-    return tableLoc;
+    return iceTable;
   }
 
-  // Create Iceberg table by HadoopTables
-  private static String createTableByHadoopTables(String metadataLoc, Schema schema,
-      PartitionSpec spec) {
-    HadoopTables tables = IcebergUtil.getHadoopTables();
-    BaseTable table = (BaseTable) tables.create(schema, spec, null, metadataLoc);
-    return table.location();
-  }
-
-  // Create Iceberg table by HadoopCatalog
-  private static String createTableByHadoopCatalog(String catalogLoc, Schema schema,
-      PartitionSpec spec, String identifier) {
-    // Each table id increase from zero
-    HadoopCatalog catalog = IcebergUtil.getHadoopCatalog(catalogLoc);
-    BaseTable table = (BaseTable) catalog.createTable(TableIdentifier.parse(identifier),
-        schema, spec, null);
-    return table.location();
+  /**
+   * Drops Iceberg table from Iceberg's catalog.
+   * Throws TableNotFoundException if table is not found and 'ifExists' is false.
+   */
+  public static void dropTable(FeIcebergTable feTable, boolean ifExists)
+      throws TableNotFoundException, ImpalaRuntimeException {
+    Preconditions.checkState(
+        IcebergTable.isSynchronizedTable(feTable.getMetaStoreTable()));
+    IcebergCatalog iceCatalog = IcebergUtil.getIcebergCatalog(feTable);
+    if (!iceCatalog.dropTable(feTable,
+        IcebergTable.isSynchronizedTable(feTable.getMetaStoreTable()))) {
+      // The table didn't exist.
+      if (!ifExists) {
+        throw new TableNotFoundException(String.format(
+            "Table '%s' does not exist in Iceberg catalog.", feTable.getFullName()));
+      }
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index d5688dc..e773dcb 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -32,11 +32,10 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.UnboundPredicate;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.transforms.PartitionSpecVisitor;
 import org.apache.iceberg.transforms.Transform;
@@ -53,7 +52,9 @@ import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
-import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog;
+import org.apache.impala.catalog.iceberg.IcebergHadoopTables;
+import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
@@ -64,89 +65,95 @@ import org.apache.impala.thrift.TIcebergPartitionTransform;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
 
 public class IcebergUtil {
-
-  /**
-   * Get HadoopTables by impala cluster related config
-   */
-  public static HadoopTables getHadoopTables() {
-    return new HadoopTables(FileSystemUtil.getConfiguration());
-  }
-
   /**
-   * Get HadoopCatalog by impala cluster related config
+   * Returns the corresponding catalog implementation for 'feTable'.
    */
-  public static HadoopCatalog getHadoopCatalog(String location) {
-    return new HadoopCatalog(FileSystemUtil.getConfiguration(), location);
+  public static IcebergCatalog getIcebergCatalog(FeIcebergTable feTable)
+      throws ImpalaRuntimeException {
+    return getIcebergCatalog(feTable.getIcebergCatalog(),
+        feTable.getIcebergCatalogLocation());
   }
 
   /**
-   * Get BaseTable from FeIcebergTable
+   * Returns the corresponding catalog implementation.
    */
-  public static BaseTable getBaseTable(FeIcebergTable table) {
-    return getBaseTable(table.getIcebergCatalog(), getIcebergTableIdentifier(table),
-        table.getIcebergCatalogLocation());
+  public static IcebergCatalog getIcebergCatalog(TIcebergCatalog catalog, String location)
+      throws ImpalaRuntimeException {
+    switch (catalog) {
+      case HADOOP_TABLES: return IcebergHadoopTables.getInstance();
+      case HADOOP_CATALOG: return new IcebergHadoopCatalog(location);
+      default: throw new ImpalaRuntimeException (
+          "Unexpected catalog type: " + catalog.toString());
+    }
   }
 
   /**
-   * Get BaseTable from each parameters
+   * Helper method to load native Iceberg table for 'feTable'.
    */
-  public static BaseTable getBaseTable(TIcebergCatalog catalog, String tableName,
-      String location) {
-    if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
-      return getBaseTableByHadoopCatalog(tableName, location);
-    } else {
-      // We use HadoopTables as default Iceberg catalog type
-      HadoopTables hadoopTables = IcebergUtil.getHadoopTables();
-      return (BaseTable) hadoopTables.load(location);
+  public static Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
+    try {
+      IcebergCatalog cat = getIcebergCatalog(feTable);
+      return cat.loadTable(feTable);
+    } catch (ImpalaRuntimeException e) {
+      throw new TableLoadingException(String.format(
+          "Failed to load Iceberg table: %s", feTable.getFullName()), e);
     }
   }
 
   /**
-   * Use location, namespace(database) and name(table) to get BaseTable by HadoopCatalog
+   * Helper method to load native Iceberg table.
    */
-  private static BaseTable getBaseTableByHadoopCatalog(String tableName,
-      String catalogLoc) {
-    HadoopCatalog hadoopCatalog = IcebergUtil.getHadoopCatalog(catalogLoc);
-    return (BaseTable) hadoopCatalog.loadTable(TableIdentifier.parse(tableName));
+  private static Table loadTable(TIcebergCatalog catalog, TableIdentifier tableId,
+      String location) throws TableLoadingException {
+    try {
+      IcebergCatalog cat = getIcebergCatalog(catalog, location);
+      return cat.loadTable(tableId, location);
+    } catch (ImpalaRuntimeException e) {
+      throw new TableLoadingException(String.format(
+          "Failed to load Iceberg table: %s at location: %s",
+          tableId, location), e);
+    }
   }
 
   /**
    * Get TableMetadata by FeIcebergTable
    */
-  public static TableMetadata getIcebergTableMetadata(FeIcebergTable table) {
-    return getIcebergTableMetadata(table.getIcebergCatalog(),
-        getIcebergTableIdentifier(table), table.getIcebergCatalogLocation());
+  public static TableMetadata getIcebergTableMetadata(FeIcebergTable table)
+      throws TableLoadingException {
+    BaseTable iceTable = (BaseTable)IcebergUtil.loadTable(table);
+    return iceTable.operations().current();
   }
 
   /**
-   * Get TableMetadata by related info
-   * tableName is table full name, usually database.table
+   * Get TableMetadata by related info tableName is table full name, usually
+   * database.table
    */
   public static TableMetadata getIcebergTableMetadata(TIcebergCatalog catalog,
-      String tableName, String location) {
-    BaseTable baseTable = getBaseTable(catalog, tableName, location);
+      TableIdentifier tableId, String location) throws TableLoadingException {
+    BaseTable baseTable = (BaseTable)IcebergUtil.loadTable(catalog,
+        tableId, location);
     return baseTable.operations().current();
   }
 
   /**
    * Get Iceberg table identifier by table property
    */
-  public static String getIcebergTableIdentifier(FeIcebergTable table) {
+  public static TableIdentifier getIcebergTableIdentifier(FeIcebergTable table) {
     return getIcebergTableIdentifier(table.getMetaStoreTable());
   }
 
-  public static String getIcebergTableIdentifier(
+  public static TableIdentifier getIcebergTableIdentifier(
       org.apache.hadoop.hive.metastore.api.Table msTable) {
     String name = msTable.getParameters().get(IcebergTable.ICEBERG_TABLE_IDENTIFIER);
     if (name == null || name.isEmpty()) {
-      return msTable.getDbName() + "." + msTable.getTableName();
+      return TableIdentifier.of(msTable.getDbName(), msTable.getTableName());
     }
 
     // If database not been specified in property, use default
     if (!name.contains(".")) {
-      return Catalog.DEFAULT_DB + "." + name;
+      return TableIdentifier.of(Catalog.DEFAULT_DB, name);
     }
-    return name;
+    return TableIdentifier.parse(name);
   }
 
   /**
@@ -192,9 +199,9 @@ public class IcebergUtil {
    * Get iceberg table catalog type from hms table properties
    * use HadoopCatalog as default
    */
-  public static TIcebergCatalog getIcebergCatalog(
+  public static TIcebergCatalog getTIcebergCatalog(
       org.apache.hadoop.hive.metastore.api.Table msTable) {
-    TIcebergCatalog catalog = getIcebergCatalog(
+    TIcebergCatalog catalog = getTIcebergCatalog(
         msTable.getParameters().get(IcebergTable.ICEBERG_CATALOG));
     return catalog == null ? TIcebergCatalog.HADOOP_CATALOG : catalog;
   }
@@ -202,7 +209,7 @@ public class IcebergUtil {
   /**
    * Get TIcebergCatalog from a string, usually from table properties
    */
-  public static TIcebergCatalog getIcebergCatalog(String catalog){
+  public static TIcebergCatalog getTIcebergCatalog(String catalog){
     if ("hadoop.tables".equalsIgnoreCase(catalog)) {
       return TIcebergCatalog.HADOOP_TABLES;
     } else if ("hadoop.catalog".equalsIgnoreCase(catalog)) {
@@ -437,8 +444,8 @@ public class IcebergUtil {
    * Get iceberg data file by file system table location and iceberg predicates
    */
   public static List<DataFile> getIcebergDataFiles(FeIcebergTable table,
-      List<UnboundPredicate> predicates) {
-    BaseTable baseTable = IcebergUtil.getBaseTable(table);
+      List<UnboundPredicate> predicates) throws TableLoadingException {
+    BaseTable baseTable = (BaseTable)IcebergUtil.loadTable(table);
     TableScan scan = baseTable.newScan();
     for (UnboundPredicate predicate : predicates) {
       scan = scan.filter(predicate);
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index dd31ea6..f89dbdf 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -16,7 +16,7 @@
 # under the License.
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-
+from tests.common.skip import SkipIf
 
 class TestCreatingIcebergTable(ImpalaTestSuite):
   """Test creating iceberg managed and external table"""
@@ -33,3 +33,15 @@ class TestCreatingIcebergTable(ImpalaTestSuite):
 
   def test_create_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-create', vector, use_db=unique_database)
+
+  @SkipIf.not_hdfs
+  def test_drop_incomplete_table(self, vector, unique_database):
+    """Test DROP TABLE when the underlying directory is deleted. In that case table
+    loading fails, but we should be still able to drop the table from Impala."""
+    tbl_name = unique_database + ".synchronized_iceberg_tbl"
+    cat_location = "/test-warehouse/" + unique_database
+    self.client.execute("""create table {0} (i int) stored as iceberg
+        tblproperties('iceberg.catalog'='hadoop.catalog',
+                      'iceberg.catalog_location'='{1}')""".format(tbl_name, cat_location))
+    self.hdfs_client.delete_file_dir(cat_location, True)
+    self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name))