You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/10/30 17:03:10 UTC

[impala] 03/04: IMPALA-10166 (part 1): ALTER TABLE for Iceberg tables

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3e06d600c2dcb2c9bcdc4f52cd27cd5d180a900b
Author: skyyws <sk...@163.com>
AuthorDate: Sat Oct 10 15:14:08 2020 +0800

    IMPALA-10166 (part 1): ALTER TABLE for Iceberg tables
    
    This patch mainly implements ALTER TABLE for Iceberg
    tables, we currently support these statements:
      * ADD COLUMNS
      * RENAME TABLE
      * SET TBL_PROPERTIES
      * SET OWNER
    We forbid DROP COLUMN/REPLACE COLUMNS/ALTER COLUMN in this
    patch, since these statemens may make Iceberg tables unreadable.
    We may support column resolution by field id in the near future,
    after that, we will support COLUMN/REPLACE COLUMNS/ALTER COLUMN
    for Iceberg tables.
    
    Here something we still need to pay attention:
    1.RENAME TABLE is not supported for HadoopCatalog/HadoopTables,
    even if we already implement 'RENAME TABLE' statement, so we
    only rename the table in the Hive Metastore for external table.
    2.We cannot ADD/DROP PARTITION now since there is no API for that
    in Iceberg, but related work is already in progess in Iceberg.
    
    Testing:
    - Iceberg table alter test in test_iceberg.py
    - Iceberg table negative test in test_scanners.py
    - Rename tables in iceberg-negative.test
    
    Change-Id: I5104cc47c7b42dacdb52983f503cd263135d6bfc
    Reviewed-on: http://gerrit.cloudera.org:8080/16606
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../analysis/AlterTableAddPartitionStmt.java       |   4 +
 .../impala/analysis/AlterTableAlterColStmt.java    |  11 +++
 .../analysis/AlterTableDropPartitionStmt.java      |   4 +
 .../analysis/AlterTableRecoverPartitionsStmt.java  |   6 ++
 .../analysis/AlterTableSetFileFormatStmt.java      |   6 ++
 .../impala/analysis/AlterTableSetLocationStmt.java |   6 ++
 .../analysis/AlterTableSetRowFormatStmt.java       |   7 ++
 .../analysis/AlterTableSetTblProperties.java       |  23 ++++-
 .../org/apache/impala/analysis/AlterTableStmt.java |   5 -
 .../impala/catalog/iceberg/IcebergCatalog.java     |   7 ++
 .../catalog/iceberg/IcebergHadoopCatalog.java      |  11 +++
 .../catalog/iceberg/IcebergHadoopTables.java       |   7 ++
 .../apache/impala/service/CatalogOpExecutor.java   |  79 +++++++++++++++
 .../impala/service/IcebergCatalogOpExecutor.java   |  62 ++++++++++++
 .../java/org/apache/impala/util/IcebergUtil.java   |  73 ++++++++++++++
 .../queries/QueryTest/iceberg-alter.test           |  96 ++++++++++++++++++
 .../queries/QueryTest/iceberg-negative.test        | 109 ++++++++++++++++++---
 tests/query_test/test_iceberg.py                   |   3 +
 18 files changed, 497 insertions(+), 22 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
index 41b0308..efdd7e1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
@@ -20,6 +20,7 @@ package org.apache.impala.analysis;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Joiner;
 
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
@@ -84,6 +85,9 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt {
     if (table instanceof FeKuduTable) {
       throw new AnalysisException("ALTER TABLE ADD PARTITION is not supported for " +
           "Kudu tables: " + table.getTableName());
+    } else if (table instanceof FeIcebergTable) {
+      throw new AnalysisException("ALTER TABLE ADD PARTITION is not supported for " +
+          "Iceberg tables: " + table.getTableName());
     }
     Set<String> partitionSpecs = new HashSet<>();
     for (PartitionDef p: partitions_) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
index 5e6b5cd..6454e37 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.KuduColumn;
@@ -174,5 +175,15 @@ public class AlterTableAlterColStmt extends AlterTableStmt {
             "Altering the nullability of a column is not supported.");
       }
     }
+
+    if (t instanceof FeIcebergTable) {
+      // We cannot update column from primitive type to complex type or
+      // from complex type to primitive type
+      if (t.getColumn(colName_).getType().isComplexType() ||
+          newColDef_.getType().isComplexType()) {
+        throw new AnalysisException(String.format("ALTER TABLE CHANGE COLUMN " +
+            "is not supported for complex types in Iceberg tables."));
+      }
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
index 3cb9ac9..cd52a29 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
@@ -78,6 +79,9 @@ public class AlterTableDropPartitionStmt extends AlterTableStmt {
     if (table instanceof FeKuduTable) {
       throw new AnalysisException("ALTER TABLE DROP PARTITION is not supported for " +
           "Kudu tables: " + partitionSet_.toSql());
+    } else if (table instanceof FeIcebergTable) {
+      throw new AnalysisException("ALTER TABLE DROP PARTITION is not supported for " +
+          "Iceberg tables: " + table.getFullName());
     }
     if (!ifExists_) partitionSet_.setPartitionShouldExist();
     partitionSet_.setPrivilegeRequirement(Privilege.ALTER);
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableRecoverPartitionsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableRecoverPartitionsStmt.java
index d04f042..fe2b617 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableRecoverPartitionsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableRecoverPartitionsStmt.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableType;
@@ -48,6 +49,11 @@ public class AlterTableRecoverPartitionsStmt extends AlterTableStmt {
           "must target an HDFS table: " + tableName_);
     }
 
+    if (table_ instanceof FeIcebergTable) {
+      throw new AnalysisException("ALTER TABLE RECOVER PARTITIONS is not supported " +
+          "on Iceberg tables: " + table_.getFullName());
+    }
+
     // Make sure the target table is partitioned.
     if (table_.getMetaStoreTable().getPartitionKeysSize() == 0) {
       throw new AnalysisException("Table is not partitioned: " + tableName_);
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetFileFormatStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetFileFormatStmt.java
index b88216c..a45da95 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetFileFormatStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetFileFormatStmt.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.analysis;
 
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
@@ -60,5 +61,10 @@ public class AlterTableSetFileFormatStmt extends AlterTableSetStmt {
       throw new AnalysisException("ALTER TABLE SET FILEFORMAT is not supported " +
           "on Kudu tables: " + tbl.getFullName());
     }
+
+    if (tbl instanceof FeIcebergTable) {
+      throw new AnalysisException("ALTER TABLE SET FILEFORMAT is not supported " +
+          "on Iceberg tables: " + tbl.getFullName());
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetLocationStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetLocationStmt.java
index cb46493..01898d1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetLocationStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetLocationStmt.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsPartition;
@@ -77,6 +78,11 @@ public class AlterTableSetLocationStmt extends AlterTableSetStmt {
 
     FeTable table = getTargetTable();
     Preconditions.checkNotNull(table);
+    if (table instanceof FeIcebergTable) {
+      throw new AnalysisException("ALTER TABLE SET LOCATION is not supported on Iceberg "
+          + "tables: " + table.getFullName());
+    }
+
     if (table instanceof FeFsTable) {
       FeFsTable hdfsTable = (FeFsTable) table;
       if (getPartitionSet() != null) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetRowFormatStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetRowFormatStmt.java
index a8c9fea..aab5012 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetRowFormatStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetRowFormatStmt.java
@@ -19,6 +19,7 @@ package org.apache.impala.analysis;
 
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.RowFormat;
@@ -62,6 +63,12 @@ public class AlterTableSetRowFormatStmt extends AlterTableSetStmt {
       throw new AnalysisException(String.format("ALTER TABLE SET ROW FORMAT is only " +
           "supported on HDFS tables. Conflicting table: %1$s", tbl.getFullName()));
     }
+
+    if (tbl instanceof FeIcebergTable) {
+      throw new AnalysisException("ALTER TABLE SET ROWFORMAT is not supported " +
+          "on Iceberg tables: " + tbl.getFullName());
+    }
+
     if (partitionSet_ != null) {
       for (FeFsPartition partition: partitionSet_.getPartitions()) {
         if (partition.getFileFormat() != HdfsFileFormat.TEXT &&
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index dce7fae..634c65b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -27,8 +27,10 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
@@ -93,7 +95,11 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
           hive_metastoreConstants.META_TABLE_STORAGE));
     }
 
-    if (getTargetTable() instanceof FeKuduTable) analyzeKuduTable(analyzer);
+    if (getTargetTable() instanceof FeKuduTable) {
+      analyzeKuduTable(analyzer);
+    } else if (getTargetTable() instanceof FeIcebergTable) {
+      analyzeIcebergTable(analyzer);
+    }
 
     // Check avro schema when it is set in avro.schema.url or avro.schema.literal to
     // avoid potential metadata corruption (see IMPALA-2042).
@@ -139,6 +145,21 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
     }
   }
 
+  private void analyzeIcebergTable(Analyzer analyzer) throws AnalysisException {
+    //Cannot set these properties related to metadata
+    icebergPropertyCheck(IcebergTable.ICEBERG_FILE_FORMAT);
+    icebergPropertyCheck(IcebergTable.ICEBERG_CATALOG);
+    icebergPropertyCheck(IcebergTable.ICEBERG_CATALOG_LOCATION);
+    icebergPropertyCheck(IcebergTable.ICEBERG_TABLE_IDENTIFIER);
+  }
+
+  private void icebergPropertyCheck(String property) throws AnalysisException {
+    if (tblProperties_.containsKey(property)) {
+      throw new AnalysisException(String.format("Changing the '%s' table property is " +
+          "not supported for Iceberg table.", property));
+    }
+  }
+
   /**
    * Check that Avro schema provided in avro.schema.url or avro.schema.literal is valid
    * Json and contains only supported Impala types. If both properties are set, then
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
index d4c3b13..249987e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeDataSourceTable;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableParams;
@@ -89,10 +88,6 @@ public abstract class AlterTableStmt extends StatementBase {
       throw new AnalysisException(String.format(
           "ALTER TABLE not allowed on a nested collection: %s", tableName_));
     }
-    if (tableRef.getTable() instanceof FeIcebergTable) {
-      throw new AnalysisException(String.format(
-          "ALTER TABLE not allowed on iceberg table: %s", tableName_));
-    }
     Preconditions.checkState(tableRef instanceof BaseTableRef);
     table_ = tableRef.getTable();
     analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
index d12a123..31a45fa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
@@ -62,4 +62,11 @@ public interface IcebergCatalog {
    * Return true if the table was dropped, false if the table did not exist
    */
   boolean dropTable(FeIcebergTable feTable, boolean purge);
+
+  /**
+   * Renames Iceberg table.
+   * For HadoopTables, Iceberg does not supported 'renameTable' method
+   * For HadoopCatalog, Iceberg implement 'renameTable' method with Exception threw
+   */
+  void renameTable(FeIcebergTable feTable, TableIdentifier newTableId);
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
index f1177ea..411dd52 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
@@ -107,4 +107,15 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
     TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
     return hadoopCatalog.dropTable(tableId, purge);
   }
+
+  @Override
+  public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
+    TableIdentifier oldTableId = IcebergUtil.getIcebergTableIdentifier(feTable);
+    try {
+      hadoopCatalog.renameTable(oldTableId, newTableId);
+    } catch (UnsupportedOperationException e) {
+      throw new UnsupportedOperationException(
+          "Cannot rename Iceberg tables that use 'hadoop.catalog' as catalog.");
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
index 30dd658..4ebf1a7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
@@ -121,4 +121,11 @@ public class IcebergHadoopTables implements IcebergCatalog {
     }
     return true;
   }
+
+  @Override
+  public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
+    // HadoopTables no renameTable method in Iceberg
+    throw new UnsupportedOperationException(
+        "Cannot rename Iceberg tables that use 'hadoop.tables' as catalog.");
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 2c30bdd..97a8d69 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -685,6 +685,10 @@ public class CatalogOpExecutor {
       if (tbl instanceof KuduTable && altersKuduTable(params.getAlter_type())) {
         alterKuduTable(params, response, (KuduTable) tbl, newCatalogVersion);
         return;
+      } else if (tbl instanceof IcebergTable &&
+          altersIcebergTable(params.getAlter_type())) {
+        alterIcebergTable(params, response, (IcebergTable) tbl, newCatalogVersion);
+        return;
       }
       switch (params.getAlter_type()) {
         case ADD_COLUMNS:
@@ -932,6 +936,56 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Returns true if the given alteration type changes the underlying table stored in
+   * Iceberg in addition to the HMS table.
+   */
+  private boolean altersIcebergTable(TAlterTableType type) {
+    return type == TAlterTableType.ADD_COLUMNS
+        || type == TAlterTableType.REPLACE_COLUMNS
+        || type == TAlterTableType.DROP_COLUMN
+        || type == TAlterTableType.ALTER_COLUMN;
+  }
+
+  /**
+   * Executes the ALTER TABLE command for a Iceberg table and reloads its metadata.
+   */
+  private void alterIcebergTable(TAlterTableParams params, TDdlExecResponse response,
+      IcebergTable tbl, long newCatalogVersion) throws ImpalaException {
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    switch (params.getAlter_type()) {
+      case ADD_COLUMNS:
+        TAlterTableAddColsParams addColParams = params.getAdd_cols_params();
+        IcebergCatalogOpExecutor.addColumn(tbl, addColParams.getColumns());
+        addSummary(response, "Column(s) have been added.");
+        break;
+      case REPLACE_COLUMNS:
+        //TODO: we need support resolve column by field id at first, and then
+        // support this statement
+      case DROP_COLUMN:
+        //TODO: we need support resolve column by field id at first, and then
+        // support this statement
+        //TAlterTableDropColParams dropColParams = params.getDrop_col_params();
+        //IcebergCatalogOpExecutor.dropColumn(tbl, dropColParams.getCol_name());
+        //addSummary(response, "Column has been dropped.");
+      case ALTER_COLUMN:
+        //TODO: we need support resolve column by field id at first, and then
+        // support this statement
+        //TAlterTableAlterColParams alterColParams = params.getAlter_col_params();
+        //IcebergCatalogOpExecutor.alterColumn(tbl, alterColParams.getCol_name(),
+        //    alterColParams.getNew_col_def());
+        //addSummary(response, "Column has been altered.");
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported ALTER TABLE operation for Iceberg tables: " +
+            params.getAlter_type());
+    }
+
+    loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " +
+        params.getAlter_type().name());
+    addTableToCatalogUpdate(tbl, response.result);
+  }
+
+  /**
    * Loads the metadata of a table 'tbl' and assigns a new catalog version.
    * 'reloadFileMetadata', 'reloadTableSchema', and 'partitionsToUpdate'
    * are used only for HdfsTables and control which metadata to reload.
@@ -3265,6 +3319,13 @@ public class CatalogOpExecutor {
           isKuduHmsIntegrationEnabled);
     }
 
+    // If oldTbl is a synchronized Iceberg table, rename the underlying Iceberg table.
+    boolean isSynchronizedIcebergTable = (oldTbl instanceof IcebergTable) &&
+        IcebergTable.isSynchronizedTable(msTbl);
+    if (isSynchronizedIcebergTable) {
+      renameManagedIcebergTable((IcebergTable) oldTbl, msTbl, newTableName);
+    }
+
     // Always updates the HMS metadata for non-Kudu tables. For Kudu tables, when
     // Kudu is not integrated with the Hive Metastore or if this is an external table,
     // Kudu will not automatically update the HMS metadata, we have to do it
@@ -3321,6 +3382,24 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Renames the underlying Iceberg table for the given managed table. If the new Iceberg
+   * table name is the same as the old Iceberg table name, this method does nothing.
+   */
+  private void renameManagedIcebergTable(IcebergTable oldTbl,
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      TableName newTableName) throws ImpalaRuntimeException {
+    TableIdentifier tableId = TableIdentifier.of(newTableName.getDb(),
+        newTableName.getTbl());
+    IcebergCatalogOpExecutor.renameTable(oldTbl, tableId);
+
+    if (msTbl.getParameters().get(IcebergTable.ICEBERG_TABLE_IDENTIFIER) != null) {
+      // We need update table identifier for HadoopCatalog managed table if exists.
+      msTbl.getParameters().put(IcebergTable.ICEBERG_TABLE_IDENTIFIER,
+          tableId.toString());
+    }
+  }
+
+  /**
    * Changes the file format for the given table or partitions. This is a metadata only
    * operation, existing table data will not be converted to the new format. Returns
    * true if the file metadata to be reloaded.
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index e347f49..4ab4739 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -99,6 +100,67 @@ public class IcebergCatalogOpExecutor {
   }
 
   /**
+   * Adds a column to an existing Iceberg table.
+   */
+  public static void addColumn(FeIcebergTable feTable, List<TColumn> columns)
+      throws TableLoadingException, ImpalaRuntimeException {
+    UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
+    for (TColumn column : columns) {
+      org.apache.iceberg.types.Type type =
+          IcebergUtil.fromImpalaColumnType(column.getColumnType());
+      schema.addColumn(column.getColumnName(), type, column.getComment());
+    }
+    schema.commit();
+  }
+
+  /**
+   * Updates the column from Iceberg table.
+   * Iceberg only supports these type conversions:
+   *   INTEGER -> LONG
+   *   FLOAT -> DOUBLE
+   *   DECIMAL(s1,p1) -> DECIMAL(s1,p2), same scale, p1<=p2
+   */
+  public static void alterColumn(FeIcebergTable feTable, String colName, TColumn newCol)
+      throws TableLoadingException, ImpalaRuntimeException {
+    UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
+    org.apache.iceberg.types.Type type =
+        IcebergUtil.fromImpalaColumnType(newCol.getColumnType());
+    // Cannot change a column to complex type
+    Preconditions.checkState(type.isPrimitiveType());
+    schema.updateColumn(colName, type.asPrimitiveType());
+
+    // Rename column if newCol name and oldCol name are different
+    if (!colName.equals(newCol.getColumnName())) {
+      schema.renameColumn(colName, newCol.getColumnName());
+    }
+
+    // Update column comment if not empty
+    if (newCol.getComment() != null && !newCol.getComment().isEmpty()) {
+      schema.updateColumnDoc(colName, newCol.getComment());
+    }
+    schema.commit();
+  }
+
+  /**
+   * Drops a column from a Iceberg table.
+   */
+  public static void dropColumn(FeIcebergTable feTable, String colName)
+      throws TableLoadingException, ImpalaRuntimeException {
+    UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
+    schema.deleteColumn(colName);
+    schema.commit();
+  }
+
+  /**
+   * Rename Iceberg table
+   */
+  public static void renameTable(FeIcebergTable feTable, TableIdentifier tableId)
+      throws ImpalaRuntimeException{
+    IcebergCatalog catalog = IcebergUtil.getIcebergCatalog(feTable);
+    catalog.renameTable(feTable, tableId);
+  }
+
+  /**
    * Transform a StructField to Iceberg NestedField
    */
   private static Types.NestedField createIcebergNestedField(StructField structField)
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index c504060..efd4f87 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -28,6 +28,7 @@ import com.google.common.hash.Hashing;
 
 import org.apache.impala.common.Pair;
 import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
@@ -57,6 +58,7 @@ import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog;
 import org.apache.impala.catalog.iceberg.IcebergHadoopTables;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TIcebergCatalog;
@@ -153,6 +155,15 @@ public class IcebergUtil {
   }
 
   /**
+   * Get Iceberg UpdateSchema from 'feTable', usually use UpdateSchema to update Iceberg
+   * table schema.
+   */
+  public static UpdateSchema getIcebergUpdateSchema(FeIcebergTable feTable)
+      throws TableLoadingException, ImpalaRuntimeException {
+    return getIcebergCatalog(feTable).loadTable(feTable).updateSchema();
+  }
+
+  /**
    * Build iceberg PartitionSpec by parameters.
    * partition columns are all from source columns, this is different from hdfs table.
    */
@@ -385,6 +396,68 @@ public class IcebergUtil {
   }
 
   /**
+   * Get iceberg type from impala column type
+   */
+  public static org.apache.iceberg.types.Type fromImpalaColumnType(
+      TColumnType columnType) throws ImpalaRuntimeException {
+    return fromImpalaType(Type.fromThrift(columnType));
+  }
+
+  /**
+   * Transform impala type to iceberg type
+   */
+  public static org.apache.iceberg.types.Type fromImpalaType(Type t)
+      throws ImpalaRuntimeException {
+    if (t.isScalarType()) {
+      ScalarType st = (ScalarType) t;
+      switch (st.getPrimitiveType()) {
+        case BOOLEAN:
+          return Types.BooleanType.get();
+        case INT:
+          return Types.IntegerType.get();
+        case BIGINT:
+          return Types.LongType.get();
+        case FLOAT:
+          return Types.FloatType.get();
+        case DOUBLE:
+          return Types.DoubleType.get();
+        case STRING:
+          return Types.StringType.get();
+        case DATE:
+          return Types.DateType.get();
+        case BINARY:
+          return Types.BinaryType.get();
+        case TIMESTAMP:
+          return Types.TimestampType.withoutZone();
+        case DECIMAL:
+          return Types.DecimalType.of(st.decimalPrecision(), st.decimalScale());
+        default:
+          throw new ImpalaRuntimeException(String.format(
+              "Type %s is not supported in Iceberg", t.toSql()));
+      }
+    } else if (t.isArrayType()) {
+      ArrayType at = (ArrayType) t;
+      return Types.ListType.ofRequired(1, fromImpalaType(at.getItemType()));
+    } else if (t.isMapType()) {
+      MapType mt = (MapType) t;
+      return Types.MapType.ofRequired(1, 2,
+          fromImpalaType(mt.getKeyType()), fromImpalaType(mt.getValueType()));
+    } else if (t.isStructType()) {
+      StructType st = (StructType) t;
+      List<Types.NestedField> icebergFields = new ArrayList<>();
+      int id = 1;
+      for (StructField field : st.getFields()) {
+        icebergFields.add(Types.NestedField.required(id++, field.getName(),
+            fromImpalaType(field.getType()), field.getComment()));
+      }
+      return Types.StructType.of(icebergFields);
+    } else {
+      throw new ImpalaRuntimeException(String.format(
+          "Type %s is not supported in Iceberg", t.toSql()));
+    }
+}
+
+  /**
    * Transform iceberg type to impala type
    */
   public static Type toImpalaType(org.apache.iceberg.types.Type t)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
new file mode 100644
index 0000000..21557cb
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
@@ -0,0 +1,96 @@
+====
+---- QUERY
+CREATE TABLE iceberg_test1(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
+ALTER TABLE iceberg_test1 ADD COLUMNS(event_time TIMESTAMP, register_time DATE);
+ALTER TABLE iceberg_test1 ADD COLUMNS(message STRING, price DECIMAL(8,1));
+ALTER TABLE iceberg_test1 ADD COLUMNS(map_test MAP <STRING, array <STRING>>, struct_test STRUCT <f1: BIGINT, f2: BIGINT>);
+DESCRIBE iceberg_test1;
+---- RESULTS
+'level','string',''
+'event_time','timestamp',''
+'register_time','date',''
+'message','string',''
+'price','decimal(8,1)',''
+'map_test','map<string,array<string>>',''
+'struct_test','struct<\n  f1:bigint,\n  f2:bigint\n>',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+ALTER TABLE iceberg_test1 set TBLPROPERTIES('fake_key'='fake_value');
+DESCRIBE FORMATTED iceberg_test1;
+---- RESULTS: VERIFY_IS_SUBSET
+'','fake_key            ','fake_value          '
+---- TYPES
+string, string, string
+====
+---- QUERY
+ALTER TABLE iceberg_test1 set OWNER USER fake_user;
+DESCRIBE FORMATTED iceberg_test1;
+---- RESULTS: VERIFY_IS_SUBSET
+'OwnerType:          ','USER                ','NULL'
+'Owner:              ','fake_user           ','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+ALTER TABLE iceberg_test1 set OWNER ROLE fake_role;
+DESCRIBE FORMATTED iceberg_test1;
+---- RESULTS: VERIFY_IS_SUBSET
+'OwnerType:          ','ROLE                ','NULL'
+'Owner:              ','fake_role           ','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+CREATE TABLE iceberg_test2(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/$DATABASE/hadoop_catalog_test');
+ALTER TABLE iceberg_test2 ADD COLUMNS(event_time TIMESTAMP, register_time DATE);
+ALTER TABLE iceberg_test2 ADD COLUMNS(message STRING, price DECIMAL(8,1));
+ALTER TABLE iceberg_test2 ADD COLUMNS(map_test MAP <STRING, array <STRING>>, struct_test STRUCT <f1: BIGINT, f2: BIGINT>);
+DESCRIBE iceberg_test2;
+---- RESULTS
+'level','string',''
+'event_time','timestamp',''
+'register_time','date',''
+'message','string',''
+'price','decimal(8,1)',''
+'map_test','map<string,array<string>>',''
+'struct_test','struct<\n  f1:bigint,\n  f2:bigint\n>',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+ALTER TABLE iceberg_test2 set TBLPROPERTIES('test_key'='test_value');
+DESCRIBE FORMATTED iceberg_test2;
+---- RESULTS: VERIFY_IS_SUBSET
+'','test_key            ','test_value          '
+---- TYPES
+string, string, string
+====
+---- QUERY
+ALTER TABLE iceberg_test2 set OWNER USER fake_user;
+DESCRIBE FORMATTED iceberg_test2;
+---- RESULTS: VERIFY_IS_SUBSET
+'OwnerType:          ','USER                ','NULL'
+'Owner:              ','fake_user           ','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+ALTER TABLE iceberg_test2 set OWNER ROLE fake_role;
+DESCRIBE FORMATTED iceberg_test2;
+---- RESULTS: VERIFY_IS_SUBSET
+'OwnerType:          ','ROLE                ','NULL'
+'Owner:              ','fake_role           ','NULL'
+---- TYPES
+string, string, string
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 2e2e79b..6f3dda5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -1,12 +1,12 @@
 ====
 ---- QUERY
-CREATE TABLE iceberg_test1
+CREATE TABLE iceberg_test
 STORED AS ICEBERG;
 ---- CATCH
 AnalysisException: Table requires at least 1 column for managed iceberg table.
 ====
 ---- QUERY
-CREATE TABLE iceberg_test2(
+CREATE TABLE iceberg_test(
   level STRING
 )
 PARTITION BY SPEC
@@ -19,21 +19,16 @@ STORED AS ICEBERG;
 AnalysisException: Cannot find source column: event_time
 ====
 ---- QUERY
-CREATE TABLE iceberg_test3(
+CREATE TABLE iceberg_table_hadoop_tables(
   level STRING
 )
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ====
 ---- QUERY
-TRUNCATE iceberg_test3
+TRUNCATE iceberg_table_hadoop_tables
 ---- CATCH
-AnalysisException: TRUNCATE TABLE not supported on iceberg table: $DATABASE.iceberg_test3
-====
----- QUERY
-ALTER TABLE iceberg_test3 ADD COLUMN event_time TIMESTAMP
----- CATCH
-AnalysisException: ALTER TABLE not allowed on iceberg table: iceberg_test3
+AnalysisException: TRUNCATE TABLE not supported on iceberg table: $DATABASE.iceberg_table_hadoop_tables
 ====
 ---- QUERY
 # iceberg_non_partitioned is not partitioned
@@ -42,7 +37,7 @@ SHOW PARTITIONS functional_parquet.iceberg_non_partitioned
 AnalysisException: Table is not partitioned: functional_parquet.iceberg_non_partitioned
 ====
 ---- QUERY
-CREATE TABLE iceberg_test4(
+CREATE TABLE iceberg_table_hadoop_catalog(
   level STRING
 )
 STORED AS ICEBERG
@@ -52,7 +47,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.catalog');
 AnalysisException: Location cannot be set for Iceberg table with 'hadoop.catalog'.
 ====
 ---- QUERY
-CREATE TABLE iceberg_test5(
+CREATE TABLE iceberg_table_hadoop_catalog(
   level STRING
 )
 STORED AS ICEBERG
@@ -61,7 +56,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.catalog');
 AnalysisException: Table property 'iceberg.catalog_location' is necessary for Iceberg table with 'hadoop.catalog'.
 ====
 ---- QUERY
-CREATE EXTERNAL TABLE iceberg_test6
+CREATE EXTERNAL TABLE iceberg_external_table_hadoop_catalog
 STORED AS ICEBERG
 LOCATION '/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
 TBLPROPERTIES('iceberg.catalog_location'='/test-warehouse/fake_table', 'iceberg.table_identifier'='fake_db.fake_table');
@@ -69,17 +64,17 @@ TBLPROPERTIES('iceberg.catalog_location'='/test-warehouse/fake_table', 'iceberg.
 AnalysisException: Location cannot be set for Iceberg table with 'hadoop.catalog'.
 ====
 ---- QUERY
-CREATE EXTERNAL TABLE iceberg_test7
+CREATE EXTERNAL TABLE iceberg_table_hadoop_catalog
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.table_identifier'='fake_db.fake_table');
 ---- CATCH
 AnalysisException: Table property 'iceberg.catalog_location' is necessary for Iceberg table with 'hadoop.catalog'.
 ====
 ---- QUERY
-CREATE EXTERNAL TABLE iceberg_test8
+CREATE EXTERNAL TABLE fake_iceberg_table_hadoop_catalog
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog_location'='/test-warehouse/fake_table', 'iceberg.table_identifier'='fake_db.fake_table');
-SHOW CREATE TABLE iceberg_test8;
+SHOW CREATE TABLE fake_iceberg_table_hadoop_catalog;
 ---- CATCH
 row_regex:.*CAUSED BY: TableLoadingException: Table does not exist: fake_db.fake_table*
 ====
@@ -118,3 +113,85 @@ INSERT INTO iceberg_partitioned_insert SELECT * FROM iceberg_partitioned_insert;
 ---- CATCH
 AnalysisException: Impala cannot write partitioned Iceberg tables.
 ====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_tables RENAME TO iceberg_table_hadoop_tables_new;
+---- CATCH
+UnsupportedOperationException: Cannot rename Iceberg tables that use 'hadoop.tables' as catalog.
+====
+---- QUERY
+CREATE TABLE iceberg_table_hadoop_catalog(
+  level STRING,
+  event_time TIMESTAMP
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/$DATABASE/hadoop_catalog_test');
+ALTER TABLE iceberg_table_hadoop_catalog RENAME TO iceberg_table_hadoop_catalog_new;
+---- CATCH
+UnsupportedOperationException: Cannot rename Iceberg tables that use 'hadoop.catalog' as catalog.
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog set TBLPROPERTIES('iceberg.file_format'='orc');
+---- CATCH
+AnalysisException: Changing the 'iceberg.file_format' table property is not supported for Iceberg table.
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog set TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
+---- CATCH
+AnalysisException: Changing the 'iceberg.catalog' table property is not supported for Iceberg table.
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog set TBLPROPERTIES('iceberg.catalog_location'='/fake_location');
+---- CATCH
+AnalysisException: Changing the 'iceberg.catalog_location' table property is not supported for Iceberg table.
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog set TBLPROPERTIES('iceberg.table_identifier'='fake_db.fake_table');
+---- CATCH
+AnalysisException: Changing the 'iceberg.table_identifier' table property is not supported for Iceberg table.
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog set FILEFORMAT PARQUET;
+---- CATCH
+AnalysisException: ALTER TABLE SET FILEFORMAT is not supported on Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog SET ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
+---- CATCH
+AnalysisException: ALTER TABLE SET ROWFORMAT is not supported on Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog SET LOCATION '/fake_location';
+---- CATCH
+AnalysisException: ALTER TABLE SET LOCATION is not supported on Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog ADD PARTITION(fake_col='fake_value');
+---- CATCH
+AnalysisException: ALTER TABLE ADD PARTITION is not supported for Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog DROP PARTITION(fake_col='fake_value');
+---- CATCH
+AnalysisException: ALTER TABLE DROP PARTITION is not supported for Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog RECOVER PARTITIONS;
+---- CATCH
+AnalysisException: ALTER TABLE RECOVER PARTITIONS is not supported on Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog DROP COLUMN level;
+---- CATCH
+UnsupportedOperationException: Unsupported ALTER TABLE operation for Iceberg tables: DROP_COLUMN
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog CHANGE COLUMN level level1 STRING;
+---- CATCH
+UnsupportedOperationException: Unsupported ALTER TABLE operation for Iceberg tables: ALTER_COLUMN
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog REPLACE COLUMNS(level INT, register_time DATE);
+---- CATCH
+UnsupportedOperationException: Unsupported ALTER TABLE operation for Iceberg tables: REPLACE_COLUMNS
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index ad39a19..5b772b3 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -38,6 +38,9 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_create_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-create', vector, use_db=unique_database)
 
+  def test_alter_iceberg_tables(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-alter', vector, use_db=unique_database)
+
   @SkipIf.not_hdfs
   def test_drop_incomplete_table(self, vector, unique_database):
     """Test DROP TABLE when the underlying directory is deleted. In that case table