You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/07/15 18:24:47 UTC

[impala] branch master updated (9d46255 -> 474e022)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 9d46255  IMPALA-7087, IMPALA-8131: Read decimals from Parquet files with different precision/scale
     new d0749d5  IMPALA-10732: Use consistent DDL for specifying Iceberg partitions
     new 474e022  IMPALA-10626: Add support for Iceberg's Catalogs API

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 common/thrift/CatalogObjects.thrift                |   1 +
 fe/src/main/cup/sql-parser.cup                     |  32 ++--
 .../apache/impala/analysis/CreateTableStmt.java    |  29 ++--
 .../impala/analysis/IcebergPartitionField.java     |   4 +-
 .../impala/analysis/IcebergPartitionSpec.java      |  12 +-
 .../impala/analysis/IcebergPartitionTransform.java |  19 +++
 .../apache/impala/analysis/TableDataLayout.java    |   2 +-
 .../java/org/apache/impala/analysis/TableDef.java  |  12 +-
 .../org/apache/impala/analysis/ToSqlUtils.java     |   4 +-
 .../impala/catalog/iceberg/IcebergCatalog.java     |   9 +-
 .../impala/catalog/iceberg/IcebergCatalogs.java    | 172 +++++++++++++++++++++
 .../catalog/iceberg/IcebergHadoopCatalog.java      |  14 +-
 .../catalog/iceberg/IcebergHadoopTables.java       |   6 +-
 .../impala/catalog/iceberg/IcebergHiveCatalog.java |   6 +-
 .../impala/catalog/local/LocalIcebergTable.java    |   3 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  14 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   1 -
 .../java/org/apache/impala/util/IcebergUtil.java   |  85 +++++++---
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  25 ++-
 .../org/apache/impala/analysis/ParserTest.java     |   4 -
 fe/src/test/resources/hive-site.xml.py             |   9 ++
 .../functional/functional_schema_template.sql      |   4 +-
 .../queries/QueryTest/iceberg-catalogs.test        | 164 ++++++++++++++++++++
 .../queries/QueryTest/iceberg-create.test          |  64 ++++----
 .../queries/QueryTest/iceberg-ctas.test            |   8 +-
 .../queries/QueryTest/iceberg-negative.test        |  36 ++---
 .../queries/QueryTest/iceberg-overwrite.test       |   4 +-
 .../iceberg-partition-transform-insert.test        |  18 +--
 .../QueryTest/iceberg-partitioned-insert.test      |  26 ++--
 .../queries/QueryTest/iceberg-truncate.test        |   2 +-
 .../queries/QueryTest/show-create-table.test       | 106 ++++++-------
 tests/custom_cluster/test_event_processing.py      |   2 +-
 tests/query_test/test_iceberg.py                   |   3 +
 33 files changed, 671 insertions(+), 229 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test

[impala] 02/02: IMPALA-10626: Add support for Iceberg's Catalogs API

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 474e022fda5f591bcd68da5212c0ca8ed17a18d1
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed May 12 17:49:55 2021 +0200

    IMPALA-10626: Add support for Iceberg's Catalogs API
    
    Iceberg recently switched to use its Catalogs class to define
    catalog and table properties. Catalog information is stored in
    a configuration file such as hive-site.xml. And the table properties
    contain information about which catalog is being used and what is
    the Iceberg table id.
    
    E.g. in the Hive conf we can have the following properties to define
    catalogs:
    
     iceberg.catalog.<catalog_name>.type = hadoop
     iceberg.catalog.<catalog_name>.warehouse = somelocation
    
     or
    
     iceberg.catalog.<catalog_name>.type = hive
    
    And at the table level we can have the following:
    
    iceberg.catalog = <catalog_name>
    name = <table_identifier>
    
    Table property 'iceberg.catalog' refers to a Catalog defined in the
    configuration file. This is in contradiction with Impala's current
    behavior where we are already using 'iceberg.catalog', and it can
    have the following values:
    
     * hive.catalog for HiveCatalog
     * hadoop.catalog for HadoopCatalog
     * hadoop.tables for HadoopTables
    
    To be backward-compatible and also support the new Catalogs properties
    Impala still recognizes the above special values. But, from now Impala
    doesn't define 'iceberg.catalog' by default. 'iceberg.catalog' being
    NULL means HiveCatalog for both Impala and Iceberg's Catalogs API,
    hence for Hive and Spark as well.
    
    If 'iceberg.catalog' has a different value than the special values it
    indicates that Iceberg's Catalogs API is being used, so Impala will
    try to look up the catalog configuration from the Hive config file.
    
    Testing:
     * added SHOW CREATE TABLE tests
     * added e2e tests that create/insert/drop Iceberg tables with Catalogs
     * manually tested interop behavior with Hive
    
    Change-Id: I5dfa150986117fc55b28034c4eda38a736460ead
    Reviewed-on: http://gerrit.cloudera.org:8080/17466
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                |   1 +
 .../apache/impala/analysis/CreateTableStmt.java    |  27 ++--
 .../java/org/apache/impala/analysis/TableDef.java  |  12 +-
 .../impala/catalog/iceberg/IcebergCatalog.java     |   9 +-
 .../impala/catalog/iceberg/IcebergCatalogs.java    | 172 +++++++++++++++++++++
 .../catalog/iceberg/IcebergHadoopCatalog.java      |  14 +-
 .../catalog/iceberg/IcebergHadoopTables.java       |   6 +-
 .../impala/catalog/iceberg/IcebergHiveCatalog.java |   6 +-
 .../impala/catalog/local/LocalIcebergTable.java    |   3 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  14 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   1 -
 .../java/org/apache/impala/util/IcebergUtil.java   |  64 ++++++--
 fe/src/test/resources/hive-site.xml.py             |   9 ++
 .../queries/QueryTest/iceberg-catalogs.test        | 164 ++++++++++++++++++++
 .../queries/QueryTest/show-create-table.test       |  34 +++-
 tests/query_test/test_iceberg.py                   |   3 +
 16 files changed, 490 insertions(+), 49 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index a477d80..6f3a07a 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -99,6 +99,7 @@ enum TIcebergCatalog {
   HADOOP_TABLES = 0
   HADOOP_CATALOG = 1
   HIVE_CATALOG = 2
+  CATALOGS = 3
 }
 
 enum TColumnEncoding {
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index cdc7fad..0edce29 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -25,6 +25,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.iceberg.mr.Catalogs;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.IcebergTable;
@@ -129,11 +130,11 @@ public class CreateTableStmt extends StatementBase {
   Map<String, String> getSerdeProperties() { return tableDef_.getSerdeProperties(); }
   public THdfsFileFormat getFileFormat() { return tableDef_.getFileFormat(); }
   RowFormat getRowFormat() { return tableDef_.getRowFormat(); }
-  private void putGeneratedKuduProperty(String key, String value) {
-    tableDef_.putGeneratedKuduProperty(key, value);
+  private void putGeneratedProperty(String key, String value) {
+    tableDef_.putGeneratedProperty(key, value);
   }
   public Map<String, String> getGeneratedKuduProperties() {
-    return tableDef_.getGeneratedKuduProperties();
+    return tableDef_.getGeneratedProperties();
   }
 
   // Only exposed for ToSqlUtils. Returns the list of primary keys declared by the user
@@ -372,7 +373,7 @@ public class CreateTableStmt extends StatementBase {
       throw new AnalysisException("Invalid storage handler specified for Kudu table: " +
           handler);
     }
-    putGeneratedKuduProperty(KuduTable.KEY_STORAGE_HANDLER,
+    putGeneratedProperty(KuduTable.KEY_STORAGE_HANDLER,
         KuduTable.KUDU_STORAGE_HANDLER);
 
     String kuduMasters = getKuduMasters(analyzer);
@@ -381,7 +382,7 @@ public class CreateTableStmt extends StatementBase {
           "Table property '%s' is required when the impalad startup flag " +
           "-kudu_master_hosts is not used.", KuduTable.KEY_MASTER_HOSTS));
     }
-    putGeneratedKuduProperty(KuduTable.KEY_MASTER_HOSTS, kuduMasters);
+    putGeneratedProperty(KuduTable.KEY_MASTER_HOSTS, kuduMasters);
 
     // TODO: Find out what is creating a directory in HDFS and stop doing that. Kudu
     //       tables shouldn't have HDFS dirs: IMPALA-3570
@@ -497,7 +498,7 @@ public class CreateTableStmt extends StatementBase {
       throw new AnalysisException(String.format("Cannot analyze Kudu table '%s': %s",
           getTbl(), e.getMessage()));
     }
-    putGeneratedKuduProperty(KuduTable.KEY_TABLE_NAME,
+    putGeneratedProperty(KuduTable.KEY_TABLE_NAME,
         KuduUtil.getDefaultKuduTableName(getDb(), getTbl(), isHMSIntegrationEnabled));
   }
 
@@ -621,7 +622,7 @@ public class CreateTableStmt extends StatementBase {
       throw new AnalysisException("Invalid storage handler " +
           "specified for Iceberg format: " + handler);
     }
-    putGeneratedKuduProperty(IcebergTable.KEY_STORAGE_HANDLER,
+    putGeneratedProperty(IcebergTable.KEY_STORAGE_HANDLER,
         IcebergTable.ICEBERG_STORAGE_HANDLER);
 
     String fileformat = getTblProperties().get(IcebergTable.ICEBERG_FILE_FORMAT);
@@ -629,7 +630,7 @@ public class CreateTableStmt extends StatementBase {
       throw new AnalysisException("Invalid fileformat for Iceberg table: " + fileformat);
     }
     if (fileformat == null || fileformat.isEmpty()) {
-      putGeneratedKuduProperty(IcebergTable.ICEBERG_FILE_FORMAT, "parquet");
+      putGeneratedProperty(IcebergTable.ICEBERG_FILE_FORMAT, "parquet");
     }
 
     // Determine the Iceberg catalog being used. The default catalog is HiveCatalog.
@@ -637,7 +638,6 @@ public class CreateTableStmt extends StatementBase {
     TIcebergCatalog catalog;
     if (catalogStr == null || catalogStr.isEmpty()) {
       catalog = TIcebergCatalog.HIVE_CATALOG;
-      putGeneratedKuduProperty(IcebergTable.ICEBERG_CATALOG, "hive.catalog");
     } else {
       catalog = IcebergUtil.getTIcebergCatalog(catalogStr);
     }
@@ -659,6 +659,8 @@ public class CreateTableStmt extends StatementBase {
       break;
       case HADOOP_TABLES: validateTableInHadoopTables();
       break;
+      case CATALOGS: validateTableInCatalogs();
+      break;
       default: throw new AnalysisException(String.format(
           "Unknown Iceberg catalog type: %s", catalog));
     }
@@ -692,6 +694,13 @@ public class CreateTableStmt extends StatementBase {
     }
   }
 
+  private void validateTableInCatalogs() {
+    String tableId = getTblProperties().get(IcebergTable.ICEBERG_TABLE_IDENTIFIER);
+    if (tableId != null && !tableId.isEmpty()) {
+      putGeneratedProperty(Catalogs.NAME, tableId);
+    }
+  }
+
   /**
    * For iceberg table, partition column must be from source column
    */
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 4997fb1..dd1465f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -111,8 +111,8 @@ class TableDef {
   // True if analyze() has been called.
   private boolean isAnalyzed_ = false;
 
-  // Generated Kudu properties set during analysis.
-  private Map<String, String> generatedKuduProperties_ = new HashMap<>();
+  // Generated properties set during analysis. Currently used by Kudu and Iceberg.
+  private Map<String, String> generatedProperties_ = new HashMap<>();
 
   // END: Members that need to be reset()
   /////////////////////////////////////////
@@ -335,7 +335,7 @@ class TableDef {
     primaryKeyColDefs_.clear();
     columnDefs_.clear();
     isAnalyzed_ = false;
-    generatedKuduProperties_.clear();
+    generatedProperties_.clear();
   }
 
   public TableName getTblName() {
@@ -367,10 +367,10 @@ class TableDef {
   List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; }
   boolean isExternal() { return isExternal_; }
   boolean getIfNotExists() { return ifNotExists_; }
-  Map<String, String> getGeneratedKuduProperties() { return generatedKuduProperties_; }
-  void putGeneratedKuduProperty(String key, String value) {
+  Map<String, String> getGeneratedProperties() { return generatedProperties_; }
+  void putGeneratedProperty(String key, String value) {
     Preconditions.checkNotNull(key);
-    generatedKuduProperties_.put(key, value);
+    generatedProperties_.put(key, value);
   }
   List<KuduPartitionParam> getKuduPartitionParams() {
     return dataLayout_.getKuduPartitionParams();
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
index 31a45fa..9dd693d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.ImpalaRuntimeException;
 
 /**
  * Interface for Iceberg catalogs. Only contains a minimal set of methods to make
@@ -39,7 +40,7 @@ public interface IcebergCatalog {
       Schema schema,
       PartitionSpec spec,
       String location,
-      Map<String, String> properties);
+      Map<String, String> properties) throws ImpalaRuntimeException;
 
   /**
    * Loads a native Iceberg table based on the information in 'feTable'.
@@ -52,9 +53,11 @@ public interface IcebergCatalog {
    *     interface, e.g. HadoopCatalog.
    * @param tableLocation is the filesystem path to load the table via the HadoopTables
    *     interface.
+   * @param properties provides information for table loading when Iceberg Catalogs
+   *     is being used.
    */
-   Table loadTable(TableIdentifier tableId, String tableLocation)
-      throws TableLoadingException;
+   Table loadTable(TableIdentifier tableId, String tableLocation,
+      Map<String, String> properties) throws TableLoadingException;
 
   /**
    * Drops the table from this catalog.
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
new file mode 100644
index 0000000..56133dd
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.ConfigProperties;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTable;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TIcebergCatalog;
+import org.apache.impala.util.IcebergUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of IcebergCatalog for tables handled by Iceberg's Catalogs API.
+ */
+public class IcebergCatalogs implements IcebergCatalog {
+  private static IcebergCatalogs instance_;
+
+  public synchronized static IcebergCatalogs getInstance() {
+    if (instance_ == null) {
+      instance_ = new IcebergCatalogs();
+    }
+    return instance_;
+  }
+
+  private Configuration configuration_;
+
+  private IcebergCatalogs() {
+    configuration_ = new HiveConf(IcebergCatalogs.class);
+    // We need to set ENGINE_HIVE_ENABLED in order to get Iceberg use the
+    // appropriate SerDe and Input/Output format classes.
+    configuration_.setBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, true);
+  }
+
+  public TIcebergCatalog getUnderlyingCatalogType(String catalogName) {
+    String catalogType = configuration_.get(catalogPropertyConfigKey(
+        catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+    if (catalogType == null ||
+        CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType)) {
+      return TIcebergCatalog.HIVE_CATALOG;
+    }
+    if (CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP.equalsIgnoreCase(catalogType)) {
+      return TIcebergCatalog.HADOOP_CATALOG;
+    }
+    if (Catalogs.LOCATION.equalsIgnoreCase(catalogType)) {
+      return TIcebergCatalog.HADOOP_TABLES;
+    }
+    return TIcebergCatalog.CATALOGS;
+  }
+
+  @Override
+  public Table createTable(
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec,
+      String location,
+      Map<String, String> tableProps) throws ImpalaRuntimeException {
+    setContextClassLoader();
+    String catName = tableProps.get(IcebergTable.ICEBERG_CATALOG);
+    Preconditions.checkState(catName != null);
+    String catalogType = configuration_.get(catalogPropertyConfigKey(
+      catName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+    if (catalogType == null) {
+      throw new ImpalaRuntimeException(
+          String.format("Unknown catalog name: %s", catName));
+    }
+    Properties properties = createPropsForCatalogs(identifier, location, tableProps);
+    properties.setProperty(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
+    properties.setProperty(InputFormatConfig.PARTITION_SPEC,
+        PartitionSpecParser.toJson(spec));
+    return Catalogs.createTable(configuration_, properties);
+  }
+
+  @Override
+  public Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
+    setContextClassLoader();
+    TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
+    return loadTable(tableId, feTable.getLocation(),
+        feTable.getMetaStoreTable().getParameters());
+  }
+
+  @Override
+  public Table loadTable(TableIdentifier tableId, String tableLocation,
+      Map<String, String> tableProps) throws TableLoadingException {
+    setContextClassLoader();
+    Properties properties = createPropsForCatalogs(tableId, tableLocation, tableProps);
+    return Catalogs.loadTable(configuration_, properties);
+  }
+
+  @Override
+  public boolean dropTable(FeIcebergTable feTable, boolean purge) {
+    setContextClassLoader();
+    if (!purge) return true;
+    TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
+    String tableLocation = feTable.getLocation();
+    Properties properties = createPropsForCatalogs(tableId, tableLocation,
+        feTable.getMetaStoreTable().getParameters());
+    return Catalogs.dropTable(configuration_, properties);
+  }
+
+  @Override
+  public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
+    // Iceberg's Catalogs class has no renameTable() method
+    throw new UnsupportedOperationException(
+        "Cannot rename Iceberg tables that use 'Catalogs'.");
+  }
+
+  private Properties createPropsForCatalogs(TableIdentifier tableId, String location,
+      Map<String, String> tableProps) {
+    Properties properties = new Properties();
+    properties.putAll(tableProps);
+    if (tableId != null) {
+      properties.setProperty(Catalogs.NAME, tableId.toString());
+    } else if (location != null) {
+      properties.setProperty(Catalogs.LOCATION, location);
+    }
+    return properties;
+  }
+
+  private static String catalogPropertyConfigKey(String catalogName,
+      String catalogProperty) {
+    return String.format("%s%s.%s", InputFormatConfig.CATALOG_CONFIG_PREFIX,
+        catalogName, catalogProperty);
+  }
+
+  /**
+   * Some of the above methods might be running on native threads as they might be invoked
+   * via JNI. In that case the context class loader for those threads are null. 'Catalogs'
+   * uses JNDI to load the catalog implementations, e.g. HadoopCatalog or HiveCatalog.
+   * JNDI uses the context class loader, but as it is null it falls back to the bootstrap
+   * class loader that doesn't have the Iceberg classes on its classpath.
+   * To avoid ClassNotFoundException we set the context class loader to the class loader
+   * that loaded this class.
+   */
+  private void setContextClassLoader() {
+    if (Thread.currentThread().getContextClassLoader() != null) return;
+    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
index 411dd52..287b004 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
@@ -18,7 +18,9 @@
 package org.apache.impala.catalog.iceberg;
 
 import java.lang.NullPointerException;
+import java.util.HashMap;
 import java.util.Map;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -45,7 +47,11 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
   private HadoopCatalog hadoopCatalog;
 
   public IcebergHadoopCatalog(String catalogLocation) {
-    hadoopCatalog = new HadoopCatalog(FileSystemUtil.getConfiguration(), catalogLocation);
+    hadoopCatalog = new HadoopCatalog();
+    Map<String, String> props = new HashMap<>();
+    props.put(CatalogProperties.WAREHOUSE_LOCATION, catalogLocation);
+    hadoopCatalog.setConf(FileSystemUtil.getConfiguration());
+    hadoopCatalog.initialize("", props);
   }
 
   @Override
@@ -65,12 +71,12 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
     Preconditions.checkState(
       feTable.getIcebergCatalog() == TIcebergCatalog.HADOOP_CATALOG);
     TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
-    return loadTable(tableId, null);
+    return loadTable(tableId, null, null);
   }
 
   @Override
-  public Table loadTable(TableIdentifier tableId, String tableLocation)
-      throws TableLoadingException {
+  public Table loadTable(TableIdentifier tableId, String tableLocation,
+      Map<String, String> properties) throws TableLoadingException {
     Preconditions.checkState(tableId != null);
     final int MAX_ATTEMPTS = 5;
     final int SLEEP_MS = 500;
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
index 4ebf1a7..42017f3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
@@ -70,12 +70,12 @@ public class IcebergHadoopTables implements IcebergCatalog {
   public Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
     Preconditions.checkState(
         feTable.getIcebergCatalog() == TIcebergCatalog.HADOOP_TABLES);
-    return loadTable(null, feTable.getLocation());
+    return loadTable(null, feTable.getLocation(), null);
   }
 
   @Override
-  public Table loadTable(TableIdentifier tableId, String tableLocation)
-      throws TableLoadingException {
+  public Table loadTable(TableIdentifier tableId, String tableLocation,
+      Map<String, String> properties) throws TableLoadingException {
     Preconditions.checkState(tableLocation != null);
     final int MAX_ATTEMPTS = 5;
     final int SLEEP_MS = 500;
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
index e90e3be..3841277 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
@@ -70,12 +70,12 @@ public class IcebergHiveCatalog implements IcebergCatalog {
     Preconditions.checkState(
         feTable.getIcebergCatalog() == TIcebergCatalog.HIVE_CATALOG);
     TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
-    return loadTable(tableId, null);
+    return loadTable(tableId, null, null);
   }
 
   @Override
-  public Table loadTable(TableIdentifier tableId, String tableLocation)
-      throws TableLoadingException {
+  public Table loadTable(TableIdentifier tableId, String tableLocation,
+      Map<String, String> properties) throws TableLoadingException {
     Preconditions.checkState(tableId != null);
     try {
       return hiveCatalog_.loadTable(tableId);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 463deab..0de145e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -71,7 +71,8 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
       TableMetadata metadata =
           IcebergUtil.getIcebergTableMetadata(params.icebergCatalog_,
               IcebergUtil.getIcebergTableIdentifier(msTable),
-              params.icebergCatalogLocation_);
+              params.icebergCatalogLocation_,
+              msTable.getParameters());
       List<Column> iceColumns =
           IcebergSchemaConverter.convertToImpalaSchema(metadata.schema());
       validateColumns(iceColumns, msTable.getSd().getCols());
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3b77cc9..18b8cd5 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2392,7 +2392,7 @@ public class CatalogOpExecutor {
     Preconditions.checkState(IcebergTable.isIcebergTable(msTbl));
     // Only synchronized tables can be integrated.
     if (!IcebergTable.isSynchronizedTable(msTbl)) return false;
-    return IcebergUtil.getTIcebergCatalog(msTbl) == TIcebergCatalog.HIVE_CATALOG;
+    return IcebergUtil.isHiveCatalog(msTbl);
   }
 
   /**
@@ -3361,16 +3361,18 @@ public class CatalogOpExecutor {
               newTable.getSd().setLocation(tableLoc);
             } else {
               if (location == null) {
-                if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
-                  // When creating external Iceberg table with 'hadoop.catalog' we load
-                  // the Iceberg table using catalog location and table identifier to get
+                if (IcebergUtil.getUnderlyingCatalog(newTable) !=
+                    TIcebergCatalog.HADOOP_TABLES) {
+                  // When creating external Iceberg table we load
+                  // the Iceberg table using catalog and table identifier to get
                   // the actual location of the table. This way we can also get the
                   // correct location for tables stored in nested namespaces.
                   TableIdentifier identifier =
                       IcebergUtil.getIcebergTableIdentifier(newTable);
                   newTable.getSd().setLocation(IcebergUtil.loadTable(
-                      TIcebergCatalog.HADOOP_CATALOG, identifier,
-                      IcebergUtil.getIcebergCatalogLocation(newTable)).location());
+                      catalog, identifier,
+                      IcebergUtil.getIcebergCatalogLocation(newTable),
+                      newTable.getParameters()).location());
                 } else {
                   addSummary(response,
                       "Location is necessary for external iceberg table.");
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 5981ddc..bdabde8 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -179,7 +179,6 @@ public class IcebergCatalogOpExecutor {
    */
   private static boolean isHmsOnlyProperty(String propKey) {
     if (IcebergTable.ICEBERG_FILE_FORMAT.equals(propKey)) return true;
-    if (IcebergTable.ICEBERG_CATALOG.equals(propKey)) return true;
     if (IcebergTable.ICEBERG_CATALOG_LOCATION.equals(propKey)) return true;
     if (IcebergTable.ICEBERG_TABLE_IDENTIFIER.equals(propKey)) return true;
     if (CatalogOpExecutor.CAPABILITIES_KEY.equals(propKey)) return true;
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 48c6789..5c4ff03 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -49,6 +50,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.transforms.PartitionSpecVisitor;
 import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.types.Conversions;
@@ -66,6 +68,7 @@ import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog;
 import org.apache.impala.catalog.iceberg.IcebergHadoopTables;
 import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
+import org.apache.impala.catalog.iceberg.IcebergCatalogs;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
@@ -100,6 +103,7 @@ public class IcebergUtil {
       case HADOOP_TABLES: return IcebergHadoopTables.getInstance();
       case HIVE_CATALOG: return IcebergHiveCatalog.getInstance();
       case HADOOP_CATALOG: return new IcebergHadoopCatalog(location);
+      case CATALOGS: return IcebergCatalogs.getInstance();
       default: throw new ImpalaRuntimeException (
           "Unexpected catalog type: " + catalog.toString());
     }
@@ -110,17 +114,17 @@ public class IcebergUtil {
    */
   public static Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
     return loadTable(feTable.getIcebergCatalog(), getIcebergTableIdentifier(feTable),
-        feTable.getIcebergCatalogLocation());
+        feTable.getIcebergCatalogLocation(), feTable.getMetaStoreTable().getParameters());
   }
 
   /**
    * Helper method to load native Iceberg table.
    */
   public static Table loadTable(TIcebergCatalog catalog, TableIdentifier tableId,
-      String location) throws TableLoadingException {
+      String location, Map<String, String> tableProps) throws TableLoadingException {
     try {
       IcebergCatalog cat = getIcebergCatalog(catalog, location);
-      return cat.loadTable(tableId, location);
+      return cat.loadTable(tableId, location, tableProps);
     } catch (ImpalaRuntimeException e) {
       throw new TableLoadingException(String.format(
           "Failed to load Iceberg table: %s at location: %s",
@@ -142,9 +146,10 @@ public class IcebergUtil {
    * database.table
    */
   public static TableMetadata getIcebergTableMetadata(TIcebergCatalog catalog,
-      TableIdentifier tableId, String location) throws TableLoadingException {
+      TableIdentifier tableId, String location, Map<String, String> tableProps)
+      throws TableLoadingException {
     BaseTable baseTable = (BaseTable)IcebergUtil.loadTable(catalog,
-        tableId, location);
+        tableId, location, tableProps);
     return baseTable.operations().current();
   }
 
@@ -159,6 +164,10 @@ public class IcebergUtil {
       org.apache.hadoop.hive.metastore.api.Table msTable) {
     String name = msTable.getParameters().get(IcebergTable.ICEBERG_TABLE_IDENTIFIER);
     if (name == null || name.isEmpty()) {
+      // Iceberg's Catalogs API uses table property 'name' for the table id.
+      name = msTable.getParameters().get(Catalogs.NAME);
+    }
+    if (name == null || name.isEmpty()) {
       return TableIdentifier.of(msTable.getDbName(), msTable.getTableName());
     }
 
@@ -216,14 +225,28 @@ public class IcebergUtil {
   }
 
   /**
+   * Returns true if 'msTable' uses HiveCatalog.
+   */
+  public static boolean isHiveCatalog(
+      org.apache.hadoop.hive.metastore.api.Table msTable) {
+    TIcebergCatalog tCat = getTIcebergCatalog(msTable);
+    if (tCat == TIcebergCatalog.HIVE_CATALOG) return true;
+    if (tCat == TIcebergCatalog.CATALOGS) {
+      String catName = msTable.getParameters().get(IcebergTable.ICEBERG_CATALOG);
+      tCat = IcebergCatalogs.getInstance().getUnderlyingCatalogType(catName);
+      return tCat == TIcebergCatalog.HIVE_CATALOG;
+    }
+    return false;
+  }
+
+  /**
    * Get iceberg table catalog type from hms table properties
    * use HiveCatalog as default
    */
   public static TIcebergCatalog getTIcebergCatalog(
       org.apache.hadoop.hive.metastore.api.Table msTable) {
-    TIcebergCatalog catalog = getTIcebergCatalog(
+    return getTIcebergCatalog(
         msTable.getParameters().get(IcebergTable.ICEBERG_CATALOG));
-    return catalog == null ? TIcebergCatalog.HIVE_CATALOG : catalog;
   }
 
   /**
@@ -234,10 +257,33 @@ public class IcebergUtil {
       return TIcebergCatalog.HADOOP_TABLES;
     } else if ("hadoop.catalog".equalsIgnoreCase(catalog)) {
       return TIcebergCatalog.HADOOP_CATALOG;
-    } else if ("hive.catalog".equalsIgnoreCase(catalog)) {
+    } else if ("hive.catalog".equalsIgnoreCase(catalog) ||
+               catalog == null) {
       return TIcebergCatalog.HIVE_CATALOG;
     }
-    return null;
+    return TIcebergCatalog.CATALOGS;
+  }
+
+  /**
+   * Return the underlying Iceberg catalog when Iceberg Catalogs is being used, simply
+   * return the Iceberg catalog otherwise.
+   */
+  public static TIcebergCatalog getUnderlyingCatalog(
+      org.apache.hadoop.hive.metastore.api.Table msTable) {
+    return getUnderlyingCatalog(
+        msTable.getParameters().get(IcebergTable.ICEBERG_CATALOG));
+  }
+
+  /**
+   * Return the underlying Iceberg catalog when Iceberg Catalogs is being used, simply
+   * return the Iceberg catalog otherwise.
+   */
+  public static TIcebergCatalog getUnderlyingCatalog(String catalog) {
+    TIcebergCatalog tCat = getTIcebergCatalog(catalog);
+    if (tCat == TIcebergCatalog.CATALOGS) {
+      return IcebergCatalogs.getInstance().getUnderlyingCatalogType(catalog);
+    }
+    return tCat;
   }
 
   /**
diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py
index c14f982..311ec89 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -167,6 +167,15 @@ CONFIG.update({
  'hive.metastore.dml.events': 'true',
 })
 
+# Add Iceberg catalog configurations.
+CONFIG.update({
+  # Hive catalog:
+  'iceberg.catalog.ice_hive_cat.type': 'hive',
+  # Hadoop catalog:
+  'iceberg.catalog.ice_hadoop_cat.type': 'hadoop',
+  'iceberg.catalog.ice_hadoop_cat.warehouse': '${WAREHOUSE_LOCATION_PREFIX}/test-warehouse/ice_hadoop_cat',
+})
+
 if variant == 'without_hms_config':
   CONFIG.clear()
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
new file mode 100644
index 0000000..92fad47
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
@@ -0,0 +1,164 @@
+====
+---- QUERY
+CREATE TABLE iceberg_hadoop_catalogs(
+  label STRING,
+  val decimal(10,2)
+)
+PARTITION BY SPEC(label IDENTITY)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='ice_hadoop_cat');
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_hadoop_catalogs;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/ice_hadoop_cat/$DATABASE/iceberg_hadoop_catalogs','NULL'
+'','iceberg.file_format ','parquet             '
+'','iceberg.catalog     ','ice_hadoop_cat      '
+---- TYPES
+string, string, string
+====
+---- QUERY
+CREATE TABLE iceberg_hadoop_catalogs_with_id(
+  label STRING,
+  val decimal(10,2)
+)
+PARTITION BY SPEC(label IDENTITY)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='ice_hadoop_cat',
+'iceberg.table_identifier'='org.db.tbl');
+DESCRIBE FORMATTED iceberg_hadoop_catalogs_with_id;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/ice_hadoop_cat/org/db/tbl','NULL'
+'','iceberg.file_format ','parquet             '
+'','iceberg.catalog     ','ice_hadoop_cat      '
+'','iceberg.table_identifier','org.db.tbl          '
+'','name                ','org.db.tbl          '
+---- TYPES
+string, string, string
+====
+---- QUERY
+INSERT INTO iceberg_hadoop_catalogs_with_id values ('ice', 3.14);
+SELECT * from iceberg_hadoop_catalogs_with_id;
+---- RESULTS
+'ice',3.14
+---- TYPES
+STRING,DECIMAL
+====
+---- QUERY
+SHOW FILES IN iceberg_hadoop_catalogs_with_id;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/ice_hadoop_cat/org/db/tbl/data/label=ice/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_hadoop_cat_with_id_ext
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='ice_hadoop_cat',
+'iceberg.table_identifier'='org.db.tbl');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_hadoop_cat_with_id_ext;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/ice_hadoop_cat/org/db/tbl','NULL'
+'','iceberg.file_format ','parquet             '
+'','iceberg.catalog     ','ice_hadoop_cat      '
+'','iceberg.table_identifier','org.db.tbl          '
+'','name                ','org.db.tbl          '
+---- TYPES
+string, string, string
+====
+---- QUERY
+SELECT * FROM iceberg_hadoop_cat_with_id_ext;
+---- RESULTS
+'ice',3.14
+---- TYPES
+STRING,DECIMAL
+====
+---- QUERY
+DROP TABLE iceberg_hadoop_cat_with_id_ext;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+REFRESH iceberg_hadoop_catalogs_with_id;
+SELECT * FROM iceberg_hadoop_catalogs_with_id;
+---- RESULTS
+'ice',3.14
+---- TYPES
+STRING,DECIMAL
+====
+---- QUERY
+CREATE TABLE iceberg_hive_catalogs(
+  label STRING,
+  val decimal(10,2)
+)
+PARTITION BY SPEC(label IDENTITY)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='ice_hive_cat');
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_hive_catalogs;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL'
+'','iceberg.file_format ','parquet             '
+'','iceberg.catalog     ','ice_hive_cat        '
+---- TYPES
+string, string, string
+====
+---- QUERY
+INSERT INTO iceberg_hive_catalogs values ('ice', 3.14);
+SELECT * from iceberg_hive_catalogs;
+---- RESULTS
+'ice',3.14
+---- TYPES
+STRING,DECIMAL
+====
+---- QUERY
+SHOW FILES IN iceberg_hive_catalogs;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs/data/label=ice/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_hive_catalogs_ext
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='ice_hive_cat',
+'iceberg.table_identifier'='$DATABASE.iceberg_hive_catalogs');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_hive_catalogs_ext;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL'
+'','iceberg.file_format ','parquet             '
+'','iceberg.catalog     ','ice_hive_cat        '
+'','iceberg.table_identifier','$DATABASE.iceberg_hive_catalogs'
+'','name                ','$DATABASE.iceberg_hive_catalogs'
+---- TYPES
+string, string, string
+====
+---- QUERY
+SELECT * FROM iceberg_hive_catalogs_ext;
+---- RESULTS
+'ice',3.14
+---- TYPES
+STRING,DECIMAL
+====
+---- QUERY
+DROP TABLE iceberg_hive_catalogs_ext;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+REFRESH iceberg_hive_catalogs;
+SELECT * FROM iceberg_hive_catalogs;
+---- RESULTS
+'ice',3.14
+---- TYPES
+STRING,DECIMAL
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 8dcc4b6..10f2b4c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -703,7 +703,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_default_tbl (
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('iceberg.file_format'='parquet', 'iceberg.catalog'='hive.catalog',
+TBLPROPERTIES ('iceberg.file_format'='parquet',
 'external.table.purge'='TRUE', 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -719,7 +719,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_default_tbl_orc (
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('iceberg.file_format'='orc', 'iceberg.catalog'='hive.catalog',
+TBLPROPERTIES ('iceberg.file_format'='orc',
 'external.table.purge'='TRUE', 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -780,7 +780,7 @@ PARTITIONED BY SPEC (p, d)
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
-'iceberg.catalog'='hive.catalog', 'table_type'='ICEBERG')
+'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_ctas
@@ -797,7 +797,7 @@ PARTITIONED BY SPEC (BUCKET(5, id))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
-'iceberg.catalog'='hive.catalog', 'table_type'='ICEBERG')
+'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_ctas_ht
@@ -817,3 +817,29 @@ LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
 'iceberg.catalog'='hadoop.tables')
 ====
+---- CREATE_TABLE
+CREATE TABLE iceberg_catalogs_hive (i int)
+PARTITION BY SPEC (i bucket 3)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.catalog'='ice_hive_cat')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_catalogs_hive (i INT NULL)
+PARTITION BY SPEC (i BUCKET 3)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+'iceberg.catalog'='ice_hive_cat', 'table_type'='ICEBERG')
+====
+---- CREATE_TABLE
+CREATE TABLE iceberg_catalogs_hadoop (i int)
+PARTITION BY SPEC (i bucket 3)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.catalog'='ice_hadoop_cat')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_catalogs_hadoop (i INT NULL)
+PARTITION BY SPEC (i BUCKET 3)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+'iceberg.catalog'='ice_hadoop_cat')
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 65c65bb..08b0601 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -86,6 +86,9 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_iceberg_orc_field_id(self, vector):
     self.run_test_case('QueryTest/iceberg-orc-field-id', vector)
 
+  def test_catalogs(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-catalogs', vector, use_db=unique_database)
+
   def test_describe_history(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-table-history', vector, use_db=unique_database)
 

[impala] 01/02: IMPALA-10732: Use consistent DDL for specifying Iceberg partitions

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d0749d59de7c81a2f138986626fef12ea44e6a2c
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Jun 10 10:39:52 2021 +0200

    IMPALA-10732: Use consistent DDL for specifying Iceberg partitions
    
    Currently we have a DDL syntax for defining Iceberg partitions that
    differs from SparkSQL:
    https://iceberg.apache.org/spark-ddl/#partitioned-by
    
    E.g. Impala is using the following syntax:
    
    CREATE TABLE ice_t (i int, s string, ts timestamp, d date)
    PARTITION BY SPEC (i BUCKET 5, ts MONTH, d YEAR)
    STORED AS ICEBERG;
    
    The same in Spark is:
    
    CREATE TABLE ice_t (i int, s string, ts timestamp, d date)
    USING ICEBERG
    PARTITIONED BY (bucket(5, i), months(ts), years(d))
    
    HIVE-25179 added the following syntax for Hive:
    
    CREATE TABLE ice_t (i int, s string, ts timestamp, d date)
    PARTITIONED BY SPEC (bucket(5, i), months(ts), years(d))
    STORED BY ICEBERG;
    
    I.e. the same syntax as Spark, but adding the keyword "SPEC".
    
    This patch makes Impala use Hive's syntax, i.e. we will also
    use the PARTITIONED BY SPEC clause + the unified partition
    transform syntax.
    
    Testing:
     * existing tests has been rewritten with the new syntax
    
    Change-Id: Ib72ae445fd68fb0ab75d87b34779dbab922bbc62
    Reviewed-on: http://gerrit.cloudera.org:8080/17575
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/cup/sql-parser.cup                     | 32 ++++++----
 .../apache/impala/analysis/CreateTableStmt.java    |  2 +-
 .../impala/analysis/IcebergPartitionField.java     |  4 +-
 .../impala/analysis/IcebergPartitionSpec.java      | 12 ++--
 .../impala/analysis/IcebergPartitionTransform.java | 19 ++++++
 .../apache/impala/analysis/TableDataLayout.java    |  2 +-
 .../org/apache/impala/analysis/ToSqlUtils.java     |  4 +-
 .../java/org/apache/impala/util/IcebergUtil.java   | 21 +++----
 .../apache/impala/analysis/AnalyzeStmtsTest.java   | 25 +++++---
 .../org/apache/impala/analysis/ParserTest.java     |  4 --
 .../functional/functional_schema_template.sql      |  4 +-
 .../queries/QueryTest/iceberg-create.test          | 64 +++++++++----------
 .../queries/QueryTest/iceberg-ctas.test            |  8 +--
 .../queries/QueryTest/iceberg-negative.test        | 36 +++++------
 .../queries/QueryTest/iceberg-overwrite.test       |  4 +-
 .../iceberg-partition-transform-insert.test        | 18 +++---
 .../QueryTest/iceberg-partitioned-insert.test      | 26 ++++----
 .../queries/QueryTest/iceberg-truncate.test        |  2 +-
 .../queries/QueryTest/show-create-table.test       | 72 +++++++---------------
 tests/custom_cluster/test_event_processing.py      |  2 +-
 20 files changed, 181 insertions(+), 180 deletions(-)

diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 2115c89..5827b7d 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -1776,7 +1776,7 @@ iceberg_partition_spec_def ::=
   ;
 
 iceberg_partition_field_list ::=
-  KW_PARTITION KW_BY KW_SPEC LPAREN iceberg_partition_field_defs:cols RPAREN
+  KW_PARTITIONED KW_BY KW_SPEC LPAREN iceberg_partition_field_defs:cols RPAREN
   {: RESULT = cols; :}
   ;
 
@@ -1795,16 +1795,26 @@ iceberg_partition_field_defs ::=
   ;
 
 iceberg_partition_field_def ::=
-  IDENT:col_name iceberg_partition_transform:partition_transform
-  {: RESULT = new IcebergPartitionField(col_name, partition_transform); :}
-  ;
-
-iceberg_partition_transform ::=
-  iceberg_partition_transform_type:transfrom_type INTEGER_LITERAL:transform_param
-  {: RESULT = IcebergUtil.getPartitionTransform(transfrom_type,
-      transform_param.intValue()); :}
-  | IDENT:transfrom_type
-  {: RESULT = IcebergUtil.getPartitionTransform(transfrom_type); :}
+  IDENT:col_name
+  {:
+    RESULT = new IcebergPartitionField(col_name,
+                                       IcebergUtil.getPartitionTransform("IDENTITY"));
+  :}
+  |
+  iceberg_partition_transform_type:partition_transform
+  LPAREN IDENT:col_name RPAREN
+  {:
+    RESULT = new IcebergPartitionField(col_name, IcebergUtil.getPartitionTransform(
+                                       partition_transform));
+  :}
+  | iceberg_partition_transform_type:partition_transform
+    LPAREN
+      INTEGER_LITERAL:transform_param COMMA IDENT:col_name
+    RPAREN
+  {:
+    RESULT = new IcebergPartitionField(col_name, IcebergUtil.getPartitionTransform(
+                                       partition_transform, transform_param.intValue()));
+  :}
   ;
 
 iceberg_partition_transform_type ::=
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index a4f4ec0..cdc7fad 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -298,7 +298,7 @@ public class CreateTableStmt extends StatementBase {
       List<IcebergPartitionSpec> iceSpec = tableDef_.getIcebergPartitionSpecs();
       if (iceSpec != null && !iceSpec.isEmpty()) {
         throw new AnalysisException(
-            "PARTITION BY SPEC is only valid for Iceberg tables.");
+            "PARTITIONED BY SPEC is only valid for Iceberg tables.");
       }
     }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
index 659530a..bb77c36 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
@@ -85,9 +85,7 @@ public class IcebergPartitionField extends StmtNode {
 
   @Override
   public String toSql(ToSqlOptions options) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(origFieldName_+ " " + transform_.toSql());
-    return builder.toString();
+    return transform_.toSql(origFieldName_);
   }
 
   public TIcebergPartitionField toThrift() {
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
index 1e96401..4153f2e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
@@ -27,15 +27,15 @@ import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
 
 /**
- * Represents the partitioning of a Iceberg table as defined in the PARTITION BY SPEC
+ * Represents the partitioning of a Iceberg table as defined in the PARTITIONED BY SPEC
  * clause of a CREATE TABLE statement. Iceberg supported kinds of partition.
  * Examples:
- * PARTITION BY SPEC
+ * PARTITIONED BY SPEC
  * (
- * dt identity,
- * event_time hour,
- * event_time day,
- * event_time month
+ * dt,
+ * hour(event_time),
+ * day(event_time),
+ * month(event_time)
  * )
  */
 public class IcebergPartitionSpec extends StmtNode {
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionTransform.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionTransform.java
index 8877fa1..a20509e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionTransform.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionTransform.java
@@ -87,6 +87,25 @@ public class IcebergPartitionTransform extends StmtNode {
     return builder.toString();
   }
 
+  public final String toSql(String colName) {
+    return toSql(colName, ToSqlOptions.DEFAULT);
+  }
+
+  public String toSql(String colName, ToSqlOptions options) {
+    StringBuilder builder = new StringBuilder();
+    if (transformType_ != TIcebergPartitionTransformType.IDENTITY) {
+      builder.append(transformType_.toString()).append ("(");
+      if (transformParam_ != null) {
+        builder.append(transformParam_.toString()).append(", ");
+      }
+    }
+    builder.append(colName);
+    if (transformType_ != TIcebergPartitionTransformType.IDENTITY) {
+      builder.append(")");
+    }
+    return builder.toString();
+  }
+
   public TIcebergPartitionTransform toThrift() {
     TIcebergPartitionTransform transform = new TIcebergPartitionTransform();
     transform.setTransform_type(transformType_);
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
index d11c2c1..fd1005b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 /**
  * Represents the PARTITION BY and PARTITIONED BY clauses of a DDL statement.
- * We can use PARTITION BY SPEC clause to create iceberg table partitions.
+ * We can use PARTITIONED BY SPEC clause to create iceberg table partitions.
  */
 class TableDataLayout {
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 0aa5404..6b4bf87 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -419,7 +419,7 @@ public class ToSqlUtils {
         properties.remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
         properties.remove(IcebergTable.METADATA_LOCATION);
 
-        // Fill "PARTITION BY SPEC" part if the Iceberg table is partitioned.
+        // Fill "PARTITIONED BY SPEC" part if the Iceberg table is partitioned.
         FeIcebergTable feIcebergTable= (FeIcebergTable)table;
         if (!feIcebergTable.getPartitionSpecs().isEmpty()) {
           IcebergPartitionSpec latestPartitionSpec =
@@ -503,7 +503,7 @@ public class ToSqlUtils {
           Joiner.on(", \n  ").join(sortProperties.first)));
     }
     if (icebergPartitions != null && !icebergPartitions.isEmpty()) {
-      sb.append("PARTITION BY SPEC\n");
+      sb.append("PARTITIONED BY SPEC\n");
       sb.append(icebergPartitions);
       sb.append("\n");
     }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 5be0a9f..48c6789 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -284,24 +284,23 @@ public class IcebergUtil {
 
   public static TIcebergPartitionTransformType getPartitionTransformType(
       String transformType) throws TableLoadingException {
+    Preconditions.checkNotNull(transformType);
     transformType = transformType.toUpperCase();
     if ("IDENTITY".equals(transformType)) {
       return TIcebergPartitionTransformType.IDENTITY;
-    } else if ("HOUR".equals(transformType)) {
-      return TIcebergPartitionTransformType.HOUR;
-    } else if ("DAY".equals(transformType)) {
-      return TIcebergPartitionTransformType.DAY;
-    } else if ("MONTH".equals(transformType)) {
-      return TIcebergPartitionTransformType.MONTH;
-    } else if ("YEAR".equals(transformType)) {
-      return TIcebergPartitionTransformType.YEAR;
     } else if (transformType != null && transformType.startsWith("BUCKET")) {
       return TIcebergPartitionTransformType.BUCKET;
     } else if (transformType != null && transformType.startsWith("TRUNCATE")) {
       return TIcebergPartitionTransformType.TRUNCATE;
-    } else {
-      throw new TableLoadingException("Unsupported iceberg partition type: " +
-      transformType);
+    }
+    switch (transformType) {
+      case "HOUR":  case "HOURS":  return TIcebergPartitionTransformType.HOUR;
+      case "DAY":   case "DAYS":   return TIcebergPartitionTransformType.DAY;
+      case "MONTH": case "MONTHS": return TIcebergPartitionTransformType.MONTH;
+      case "YEAR":  case "YEARS":  return TIcebergPartitionTransformType.YEAR;
+      default:
+        throw new TableLoadingException("Unsupported iceberg partition type: " +
+            transformType);
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 12aa6bd..23b6345 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4858,21 +4858,30 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
   public void testCreatePartitionedIcebergTable() throws ImpalaException {
     String tblProperties = " TBLPROPERTIES ('iceberg.catalog'='hadoop.tables')";
     AnalyzesOk("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
-        "PARTITION BY SPEC (p1 BUCKET 10, p1 TRUNCATE 5, p2 DAY) STORED AS ICEBERG" +
-            tblProperties);
+        "PARTITIONED BY SPEC (BUCKET(10, p1), TRUNCATE(5, p1), DAY(p2)) " +
+        "STORED AS ICEBERG" + tblProperties);
+    AnalyzesOk("CREATE TABLE tbl1 (ts timestamp) " +
+        "PARTITIONED BY SPEC (YEAR(ts), MONTH(ts), DAY(ts), HOUR(ts)) " +
+        "STORED AS ICEBERG" + tblProperties);
+    AnalyzesOk("CREATE TABLE tbl1 (ts timestamp) " +
+        "PARTITIONED BY SPEC (YEARS(ts), MONTHS(ts), DAYS(ts), HOURS(ts)) " +
+        "STORED AS ICEBERG" + tblProperties);
     AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
-        "PARTITION BY SPEC (p1 BUCKET, p2 DAY) STORED AS ICEBERG" + tblProperties,
+        "PARTITIONED BY SPEC (BUCKET(p1), DAY(p2)) STORED AS ICEBERG" + tblProperties,
+        "BUCKET and TRUNCATE partition transforms should have a parameter.");
+    AnalysisError("CREATE TABLE tbl1 (i int, p1 int) " +
+        "PARTITIONED BY SPEC (TRUNCATE(p1)) STORED AS ICEBERG",
         "BUCKET and TRUNCATE partition transforms should have a parameter.");
     AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
-        "PARTITION BY SPEC (p1 BUCKET 0, p2 DAY) STORED AS ICEBERG" + tblProperties,
+        "PARTITIONED BY SPEC (BUCKET(0, p1), DAY(p2)) STORED AS ICEBERG" + tblProperties,
         "The parameter of a partition transform should be greater than zero.");
     AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
-        "PARTITION BY SPEC (p1 TRUNCATE 0, p2 DAY) STORED AS ICEBERG" + tblProperties,
+        "PARTITIONED BY SPEC (TRUNCATE(0, p1), DAY(p2)) STORED AS ICEBERG" +
+        tblProperties,
         "The parameter of a partition transform should be greater than zero.");
-
     AnalysisError("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " +
-        "PARTITION BY SPEC (p1 BUCKET 10, p2 DAY 10) STORED AS ICEBERG" + tblProperties,
+        "PARTITIONED BY SPEC (BUCKET(10, p1), DAY(10, p2)) STORED AS ICEBERG" +
+        tblProperties,
         "Only BUCKET and TRUNCATE partition transforms accept a parameter.");
   }
-
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 88b34be..b17fcad 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2997,10 +2997,6 @@ public class ParserTest extends FrontendTestBase {
     ParserError("CREATE TABLE Foo(a int PRIMARY KEY, b int BLOCK_SIZE 1+1) " +
         "STORED AS KUDU");
     ParserError("CREATE TABLE Foo(a int PRIMARY KEY BLOCK_SIZE -1) STORED AS KUDU");
-
-    // Iceberg TRUNCATE partition transform without parameter results a parse error.
-    ParserError("CREATE TABLE tbl1 (i int, p1 int) PARTITION BY SPEC (p1 TRUNCATE) " +
-        "STORED AS ICEBERG TBLPROPERTIES ('iceberg.catalog'='hadoop.tables')");
   }
 
   @Test
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 5698947..3c0e900 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3050,7 +3050,7 @@ functional
 iceberg_int_partitioned
 ---- CREATE
 CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (i INT, j INT, k INT)
-PARTITION BY SPEC (i identity, j identity)
+PARTITIONED BY SPEC (i, j)
 STORED AS ICEBERG;
 ====
 ---- DATASET
@@ -3060,7 +3060,7 @@ iceberg_partition_transforms_zorder
 ---- CREATE
 CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 (ts timestamp, s string, i int, j int)
-PARTITION BY SPEC (ts year, s bucket 5)
+PARTITIONED BY SPEC (year(ts), bucket(5, s))
 SORT BY ZORDER (i, j)
 STORED AS ICEBERG;
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
index eba73d8..5486554 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
@@ -9,14 +9,14 @@ CREATE TABLE iceberg_hadoop_tables(
   map_test MAP <STRING, array <STRING>>,
   struct_test STRUCT <f1: BIGINT, f2: BIGINT>
 )
-PARTITION BY SPEC
+PARTITIONED BY SPEC
 (
-  level IDENTITY,
-  level TRUNCATE 10,
-  event_time IDENTITY,
-  event_time HOUR,
-  event_time BUCKET 1000,
-  register_time DAY
+  level,
+  TRUNCATE(10, level),
+  event_time,
+  HOURS(event_time),
+  BUCKET(1000, event_time),
+  DAY(register_time)
 )
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
@@ -78,11 +78,11 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 CREATE TABLE iceberg_hadoop_tbls_with_loc(
   level STRING
 )
-PARTITION BY SPEC
+PARTITIONED BY SPEC
 (
-  level IDENTITY,
-  level BUCKET 12345,
-  level TRUNCATE 15
+  level,
+  BUCKET(12345, level),
+  TRUNCATE(15, level)
 )
 STORED AS ICEBERG
 LOCATION '/$DATABASE.iceberg_test_with_location'
@@ -153,14 +153,14 @@ CREATE TABLE iceberg_hadoop_catalog(
   map_test MAP <STRING, array <STRING>>,
   struct_test STRUCT <f1: BIGINT, f2: BIGINT>
 )
-PARTITION BY SPEC
+PARTITIONED BY SPEC
 (
-  level IDENTITY,
-  level TRUNCATE 10,
-  event_time IDENTITY,
-  event_time HOUR,
-  event_time BUCKET 1000,
-  register_time DAY
+  level,
+  TRUNCATE(10, level),
+  event_time,
+  HOUR(event_time),
+  BUCKET(1000, event_time),
+  DAYS(register_time)
 )
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
@@ -202,11 +202,11 @@ DROP TABLE iceberg_hadoop_catalog;
 CREATE TABLE iceberg_hadoop_catalog(
   level STRING
 )
-PARTITION BY SPEC
+PARTITIONED BY SPEC
 (
-  level IDENTITY,
-  level BUCKET 12345,
-  level TRUNCATE 10
+  level,
+  BUCKET(12345, level),
+  TRUNCATE(10, level)
 )
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
@@ -327,12 +327,12 @@ CREATE TABLE iceberg_hadoop_cat_with_orc(
   map_test MAP <STRING, array <STRING>>,
   struct_test STRUCT <f1: BIGINT, f2: BIGINT>
 )
-PARTITION BY SPEC
+PARTITIONED BY SPEC
 (
-  level IDENTITY,
-  event_time IDENTITY,
-  event_time HOUR,
-  register_time DAY
+  level,
+  event_time,
+  HOUR(event_time),
+  DAY(register_time)
 )
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.file_format'='orc','iceberg.catalog'='hadoop.catalog',
@@ -395,12 +395,12 @@ CREATE TABLE iceberg_part_hive_cat(
   map_test MAP <STRING, array <STRING>>,
   struct_test STRUCT <f1: BIGINT, f2: BIGINT>
 )
-PARTITION BY SPEC
+PARTITIONED BY SPEC
 (
-  level IDENTITY,
-  event_time IDENTITY,
-  event_time HOUR,
-  register_time DAY
+  level,
+  event_time,
+  HOUR(event_time),
+  DAY(register_time)
 )
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hive.catalog')
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-ctas.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-ctas.test
index eddac95..6cc17b2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-ctas.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-ctas.test
@@ -57,8 +57,8 @@ STRING, STRING, STRING
 ---- QUERY
 # Test CTAS in Iceberg HadoopTables catalog.
 # Set table location to custom location.
-# Use PARTITION BY SPEC
-CREATE TABLE ice_ctas_hadoop_tables_part PARTITION BY SPEC (d month)
+# Use PARTITIONED BY SPEC
+CREATE TABLE ice_ctas_hadoop_tables_part PARTITIONED BY SPEC (month(d))
 STORED AS ICEBERG
 LOCATION '/test-warehouse/$DATABASE.db/loc_test'
 TBLPROPERTIES ('iceberg.catalog'='hadoop.tables') AS SELECT s, ts, d FROM value_tbl;
@@ -78,10 +78,10 @@ STRING, STRING, STRING
 ---- QUERY
 # Test CTAS in Iceberg HadoopCatalog catalog.
 # Set 'iceberg.catalog_location' and 'iceberg.table_identifier'
-# Partition by TRUNCATE
+# Partitioned by TRUNCATE
 # Cast TINYINT to INT.
 # INSERT additional row.
-CREATE TABLE ice_ctas_hadoop_catalog_part PARTITION BY SPEC (s truncate 3)
+CREATE TABLE ice_ctas_hadoop_catalog_part PARTITIONED BY SPEC (truncate(3, s))
 STORED AS ICEBERG
 TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
                'iceberg.catalog_location'='/test-warehouse/$DATABASE.db/cat_loc',
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 2752e8f..d287c78 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -9,20 +9,16 @@ AnalysisException: Table requires at least 1 column for managed iceberg table.
 CREATE TABLE iceberg_test(
   level STRING
 )
-PARTITION BY SPEC
-(
-  level IDENTITY,
-  event_time HOUR
-)
+PARTITIONED BY SPEC(level, hour(event_time))
 STORED AS ICEBERG;
 ---- CATCH
 AnalysisException: Cannot find source column: event_time
 ====
 ---- QUERY
 CREATE TABLE non_iceberg_table_with_spec (i INT)
-PARTITION BY SPEC (i identity);
+PARTITIONED BY SPEC (i);
 ---- CATCH
-AnalysisException: PARTITION BY SPEC is only valid for Iceberg tables.
+AnalysisException: PARTITIONED BY SPEC is only valid for Iceberg tables.
 ====
 ---- QUERY
 CREATE TABLE iceberg_table_hadoop_tables(
@@ -94,7 +90,7 @@ row_regex:.*CAUSED BY: TableLoadingException: Table does not exist: fake_db.fake
 ====
 ---- QUERY
 CREATE TABLE iceberg_overwrite_bucket (i int)
-PARTITION BY SPEC (i bucket 3)
+PARTITIONED BY SPEC (bucket(3, i))
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 INSERT OVERWRITE iceberg_overwrite_bucket VALUES (1), (2), (3);
@@ -139,10 +135,7 @@ CREATE TABLE iceberg_partitioned_insert(
   level STRING,
   event_time TIMESTAMP
 )
-PARTITION BY SPEC
-(
-  level IDENTITY
-)
+PARTITIONED BY SPEC(level)
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ---- RESULTS
@@ -155,8 +148,8 @@ Static partitioning is not supported for Iceberg tables.
 ====
 ---- QUERY
 CREATE TABLE all_colss_needed_for_insert (i int, j int, k int)
-partition by spec (j identity, k identity)
-stored as iceberg;
+PARTITIONED BY SPEC (j, k)
+STORED AS ICEBERG;
 ---- RESULTS
 'Table has been created.'
 ====
@@ -316,18 +309,18 @@ STORED AS ICEBERG
 AnalysisException: Unsupported column options for file format 'ICEBERG': 'id INT PRIMARY KEY'
 ====
 ---- QUERY
-# PARTITIONED BY and PARTITION BY SPEC is not allowed in same statement.
+# PARTITIONED BY and PARTITIONED BY SPEC is not allowed in same statement.
 CREATE TABLE iceberg_part_spec_part (i INT)
 PARTITIONED BY (p INT)
-PARTITION BY SPEC (i TRUNCATE 10)
+PARTITIONED BY SPEC (TRUNCATE(10, i))
 STORED AS ICEBERG;
 ---- CATCH
 Syntax error in line
 ====
 ---- QUERY
-# PARTITION BY SPEC and PARTITIONED BY is not allowed in same statement.
+# PARTITIONED BY SPEC and PARTITIONED BY is not allowed in same statement.
 CREATE TABLE iceberg_part_part_spec (i INT)
-PARTITION BY SPEC (i TRUNCATE 10)
+PARTITIONED BY SPEC (TRUNCATE(10, i))
 PARTITIONED BY (p INT)
 STORED AS ICEBERG;
 ---- CATCH
@@ -347,3 +340,10 @@ ALTER TABLE iceberg_set_wrong_fileformat SET TBLPROPERTIES ('iceberg.file_format
 ---- CATCH
 Invalid fileformat for Iceberg table: parq
 ====
+---- QUERY
+CREATE TABLE iceberg_wrong_partition (i int)
+PARTITIONED BY SPEC (wrong(i))
+STORED AS ICEBERG;
+---- CATCH
+Unsupported iceberg partition type: WRONG
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
index 3ad743b..4b12d3b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
@@ -85,7 +85,7 @@ INT,INT
 ---- QUERY
 # Create DAY-partitioned table for INSERT OVERWRITE
 create table ice_day (ts timestamp)
-partition by spec (ts DAY)
+partitioned by spec (DAY(ts))
 stored as iceberg;
 ---- RESULTS
 'Table has been created.'
@@ -131,7 +131,7 @@ TIMESTAMP
 ---- QUERY
 # Create TRUNCATE-partitioned table for INSERT OVERWRITE
 create table ice_trunc (d decimal(10, 2))
-partition by spec (d TRUNCATE 100)
+partitioned by spec (TRUNCATE(100, d))
 stored as iceberg;
 ---- RESULTS
 'Table has been created.'
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
index 0c9c623..1d75729 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
@@ -3,7 +3,7 @@
 # Test partitioned INSERTs with single column that is also
 # the partitioned column. Partition transform is BUCKET.
 create table single_col_bucket (s string)
-partition by spec (s bucket 7)
+partitioned by spec (bucket(7, s))
 stored as iceberg;
 ====
 ---- QUERY
@@ -56,7 +56,7 @@ aggregation(SUM, NumRowGroups): 1
 ---- QUERY
 # Bucket transform for multiple columns.
 create table multi_col_bucket (i int, s string, d date, t timestamp)
-partition by spec (i bucket 3, s bucket 5, d bucket 5, t bucket 5)
+partitioned by spec (bucket(3, i), bucket(5, s), bucket(5, d), bucket(5, t))
 stored as iceberg;
 ====
 ---- QUERY
@@ -147,7 +147,7 @@ aggregation(SUM, NumRowGroups): 2
 ---- QUERY
 # Test single col TRUNCATE
 create table single_col_truncate (d decimal(10, 4))
-partition by spec (d truncate 100)
+partitioned by spec (truncate(100, d))
 stored as iceberg;
 ====
 ---- QUERY
@@ -227,7 +227,7 @@ aggregation(SUM, NumRowGroups): 3
 ---- QUERY
 # Test single col TRUNCATE
 create table multi_col_truncate (i int, b bigint, d decimal(16, 6), s string)
-partition by spec (s truncate 15, i truncate 5, b truncate 11, d truncate 100000)
+partitioned by spec (truncate(15, s), truncate(5, i), truncate(11, b), truncate(100000, d))
 stored as iceberg;
 ====
 ---- QUERY
@@ -337,7 +337,7 @@ aggregation(SUM, NumRowGroups): 1
 ---- QUERY
 # Create table with YEAR partition transform
 create table year_transform(t timestamp, d date)
-partition by spec (t year, d year)
+partitioned by spec (year(t), year(d))
 stored as iceberg;
 ====
 ---- QUERY
@@ -457,7 +457,7 @@ aggregation(SUM, NumRowGroups): 2
 ---- QUERY
 # Create table with MONTH partition transform
 create table month_transform(t timestamp, d date)
-partition by spec (t month, d month)
+partitioned by spec (month(t), month(d))
 stored as iceberg;
 ====
 ---- QUERY
@@ -565,7 +565,7 @@ aggregation(SUM, NumRowGroups): 2
 ---- QUERY
 # Create table with DAY partition transform
 create table day_transform(t timestamp, d date)
-partition by spec (t day, d day)
+partitioned by spec (day(t), day(d))
 stored as iceberg;
 ====
 ---- QUERY
@@ -689,7 +689,7 @@ aggregation(SUM, NumRowGroups): 1
 ---- QUERY
 # Create table with HOUR partition transform
 create table hour_transform(t timestamp)
-partition by spec (t hour)
+partitioned by spec (hour(t))
 stored as iceberg;
 ====
 ---- QUERY
@@ -786,7 +786,7 @@ aggregation(SUM, NumRowGroups): 3
 ====
 ---- QUERY
 create table mixed_and_shuffled (s string, b bigint, de decimal(6, 2), t timestamp, da date)
-partition by spec (t day, da year, s truncate 5, b bucket 3, de truncate 100)
+partitioned by spec (day(t), year(da), truncate(5, s), bucket(3, b), truncate(100, de))
 stored as iceberg;
 ====
 ---- QUERY
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index 85534fc..4a114dc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -3,7 +3,7 @@
 # Test partitioned INSERTs with single column that is also
 # the partitioned column.
 create table ice_only_part (i int)
-partition by spec (i identity)
+partitioned by spec (i)
 stored as iceberg;
 ---- RESULTS
 'Table has been created.'
@@ -44,7 +44,7 @@ aggregation(SUM, RowsRead): 1
 ---- QUERY
 # Test inserts with multple partition columns.
 create table ice_multi_part (i int, d date, s string)
-partition by spec(i identity, d identity)
+partitioned by spec(i, d)
 stored as iceberg;
 ---- RESULTS
 'Table has been created.'
@@ -72,7 +72,7 @@ aggregation(SUM, RowsRead): 1
 ---- QUERY
 # Test that Impala only writes one file per partitions.
 create table ice_bigints (i BIGINT, j BIGINT, k BIGINT)
-partition by spec (i identity, j identity)
+partitioned by spec (i, j)
 stored as iceberg;
 ---- RESULTS
 'Table has been created.'
@@ -110,15 +110,15 @@ create table alltypes_part (
   date_col DATE,
   string_col STRING,
   timestamp_col TIMESTAMP)
-partition by spec (
-  id identity,
-  bool_col identity,
-  int_col identity,
-  bigint_col identity,
-  float_col identity,
-  double_col identity,
-  date_col identity,
-  string_col identity)
+partitioned by spec (
+  id,
+  bool_col,
+  int_col,
+  bigint_col,
+  float_col,
+  double_col,
+  date_col,
+  string_col)
 stored as iceberg;
 ---- RESULTS
 'Table has been created.'
@@ -226,7 +226,7 @@ aggregation(SUM, NumRowGroups): 8
 ---- QUERY
 # Test inserts with multple partition columns.
 create table ice_part_non_order (i int, d date, s string)
-partition by spec(s identity, d identity)
+partitioned by spec(s, d)
 stored as iceberg;
 ---- RESULTS
 'Table has been created.'
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
index b64f67f..1385d15 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
@@ -38,7 +38,7 @@ STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
 ---- QUERY
 # Create partitioned Iceberg table
 create table ice_part (i int, s string, t timestamp)
-partition by spec (t year, i bucket 10)
+partitioned by spec (year(t), bucket(10, i))
 stored as iceberg;
 insert into ice_part
 values (1, 'ice',  '2021-01-27 18:57:25.155746000'),
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 6b2591a..8dcc4b6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -633,36 +633,14 @@ CREATE TABLE iceberg_test1_partitioned (
   p3 TIMESTAMP,
   p4 INT
 )
-PARTITION BY SPEC (
-  p1 YEAR,
-  p2 IDENTITY,
-  p2 BUCKET 500,
-  p2 TRUNCATE 15,
-  p3 HOUR,
-  p4 BUCKET 10,
-  p4 TRUNCATE 5
-)
-STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet',
-    'iceberg.catalog'='hadoop.catalog',
-    'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
----- RESULTS-HIVE
-CREATE TABLE show_create_table_test_db.iceberg_test1_partitioned (
-  level STRING NULL,
-  p1 DATE NULL,
-  p2 STRING NULL,
-  p3 TIMESTAMP NULL,
-  p4 INT NULL
-)
-PARTITION BY SPEC
-(
-  p1 YEAR,
-  p2 IDENTITY,
-  p2 BUCKET 500,
-  p2 TRUNCATE 15,
-  p3 HOUR,
-  p4 BUCKET 10,
-  p4 TRUNCATE 5
+PARTITIONED BY SPEC (
+  YEAR(p1),
+  p2,
+  BUCKET(500, p2),
+  TRUNCATE(15, p2),
+  HOUR(p3),
+  BUCKET(10, p4),
+  TRUNCATE(5, p4)
 )
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.file_format'='parquet',
@@ -676,15 +654,14 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test1_partitioned (
   p3 TIMESTAMP NULL,
   p4 INT NULL
 )
-PARTITION BY SPEC
-(
-  p1 YEAR,
-  p2 IDENTITY,
-  p2 BUCKET 500,
-  p2 TRUNCATE 15,
-  p3 HOUR,
-  p4 BUCKET 10,
-  p4 TRUNCATE 5,
+PARTITIONED BY SPEC (
+  YEAR(p1),
+  p2,
+  BUCKET(500, p2),
+  TRUNCATE(15, p2),
+  HOUR(p3),
+  BUCKET(10, p4),
+  TRUNCATE(5, p4)
 )
 STORED AS ICEBERG
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.catalog'='hadoop.catalog',
@@ -799,10 +776,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_old_style_partitions (
   p STRING NULL,
   d DATE NULL
 )
-PARTITION BY SPEC (
-  p IDENTITY,
-  d IDENTITY
-)
+PARTITIONED BY SPEC (p, d)
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
@@ -810,7 +784,7 @@ TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_ctas
-PARTITION BY SPEC(id bucket 5)
+PARTITIONED BY SPEC (bucket(5, id))
 STORED AS ICEBERG
 AS SELECT id, bool_col, bigint_col FROM functional.alltypes;
 ---- RESULTS-HIVE-3
@@ -819,9 +793,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_ctas (
   bool_col BOOLEAN NULL,
   bigint_col BIGINT NULL
 )
-PARTITION BY SPEC (
-  id BUCKET 5
-)
+PARTITIONED BY SPEC (BUCKET(5, id))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
@@ -829,7 +801,7 @@ TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_ctas_ht
-PARTITION BY SPEC(id bucket 5)
+PARTITIONED BY SPEC (bucket(5, id))
 STORED AS ICEBERG
 TBLPROPERTIES ('iceberg.catalog'='hadoop.tables')
 AS SELECT id, bool_col, bigint_col FROM functional.alltypes;
@@ -839,9 +811,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_ctas_ht (
   bool_col BOOLEAN NULL,
   bigint_col BIGINT NULL
 )
-PARTITION BY SPEC (
-  id BUCKET 5
-)
+PARTITIONED BY SPEC (BUCKET(5, id))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index e5ed34d..f3272ba 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -224,7 +224,7 @@ class TestEventProcessing(CustomClusterTestSuite):
       self.execute_query("create database if not exists {0}".format(db_name))
       self.execute_query("""
           create table {0}.{1} (i int)
-          partition by spec (i bucket 5)
+          partitioned by spec (bucket(5, i))
           stored as iceberg;""".format(db_name, tbl_name))
       self.execute_query("insert into {0}.{1} values (1)".format(db_name, tbl_name))
       data = self.execute_scalar("select * from {0}.{1}".format(db_name, tbl_name))