You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/13 02:24:32 UTC

[impala] 08/08: IMPALA-10222: CREATE TABLE AS SELECT for Iceberg tables

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6c6b0ee869a766ba816e82c2fc821e499b2041c2
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Feb 19 15:30:17 2021 +0100

    IMPALA-10222: CREATE TABLE AS SELECT for Iceberg tables
    
    This patch adds support for CREATE TABLE AS SELECT statements
    for Iceberg tables.
    
    CTAS statements work like the following in Impala:
    
    1. Analysis of the whole CTAS statement
    2. Divide CTAS to CREATE stmt and INSERT stmt
    3. Create temporary in-memory target table from the CREATE stmt
    4. Analyse the INSERT statement by using the temporary target table
    5. If everything is OK so far, create the target table
    6. Execute the INSERT query
    
    For Iceberg tables the non-trivial thing was to create the temporary
    target table without actually creating it via Iceberg API. I've created
    a new class 'IcebergCtasTarget' that mimics an FeIceberg table. It can be
    used with catalog V1 and V2 as well.
    
    Testing
     * e2e CTAS tests in iceberg-ctas.test
     * SHOW CREATE TABLE stmts in show-create-table.test
    
    Change-Id: I81d2084e401b9fa74d5ad161b51fd3e2aa3fcc67
    Reviewed-on: http://gerrit.cloudera.org:8080/17130
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/cup/sql-parser.cup                     |  10 +
 .../impala/analysis/CreateTableAsSelectStmt.java   |  18 +-
 .../org/apache/impala/analysis/InsertStmt.java     |  36 ++-
 .../org/apache/impala/catalog/CtasTargetTable.java | 176 +++++++++++++
 .../org/apache/impala/catalog/FeIcebergTable.java  |  23 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |   2 +-
 .../impala/catalog/iceberg/IcebergCtasTarget.java  | 279 +++++++++++++++++++++
 .../org/apache/impala/catalog/local/LocalDb.java   |   2 +-
 .../apache/impala/catalog/local/LocalFsTable.java  |   2 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   3 +-
 .../apache/impala/util/IcebergSchemaConverter.java |   2 +-
 .../java/org/apache/impala/util/IcebergUtil.java   |  13 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |   4 +-
 .../queries/QueryTest/iceberg-ctas.test            | 105 ++++++++
 .../queries/QueryTest/iceberg-negative.test        |  10 +-
 .../queries/QueryTest/show-create-table.test       |  39 +++
 tests/metadata/test_show_create_table.py           |   5 +-
 tests/query_test/test_iceberg.py                   |   3 +
 18 files changed, 686 insertions(+), 46 deletions(-)

diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 6fabdd7..bbb802a 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -1349,6 +1349,16 @@ create_tbl_as_select_params ::=
         select_stmt, null);
   :}
   | tbl_def_without_col_defs:tbl_def
+    iceberg_partition_spec_list:iceberg_specs
+    tbl_options:options
+    KW_AS query_stmt:select_stmt
+  {:
+    tbl_def.getIcebergPartitionSpecs().addAll(iceberg_specs);
+    tbl_def.setOptions(options);
+    RESULT = new CreateTableAsSelectStmt.CtasParams(new CreateTableStmt(tbl_def),
+        select_stmt, null);
+  :}
+  | tbl_def_without_col_defs:tbl_def
     KW_PARTITIONED KW_BY LPAREN ident_list:partition_cols RPAREN
     tbl_options:options
     KW_AS query_stmt:select_stmt
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index 7dc4639..cf952b6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -28,8 +28,10 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.catalog.iceberg.IcebergCtasTarget;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.rewrite.ExprRewriter;
@@ -65,7 +67,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
   /////////////////////////////////////////
 
   private final static EnumSet<THdfsFileFormat> SUPPORTED_INSERT_FORMATS =
-      EnumSet.of(THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT, THdfsFileFormat.KUDU);
+      EnumSet.of(THdfsFileFormat.ICEBERG, THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT,
+          THdfsFileFormat.KUDU);
 
   /**
    * Helper class for parsing.
@@ -127,7 +130,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
       throw new AnalysisException(String.format("CREATE TABLE AS SELECT " +
           "does not support the (%s) file format. Supported formats are: (%s)",
           createStmt_.getFileFormat().toString().replace("_", ""),
-          "PARQUET, TEXTFILE, KUDU"));
+          "PARQUET, TEXTFILE, KUDU, ICEBERG"));
     }
     if (createStmt_.getFileFormat() == THdfsFileFormat.KUDU && createStmt_.isExternal()) {
       // TODO: Add support for CTAS on external Kudu tables (see IMPALA-4318)
@@ -223,6 +226,17 @@ public class CreateTableAsSelectStmt extends StatementBase {
         tmpTable = db.createKuduCtasTarget(msTbl, createStmt_.getColumnDefs(),
             createStmt_.getPrimaryKeyColumnDefs(),
             createStmt_.getKuduPartitionParams());
+      } else if (IcebergTable.isIcebergTable(msTbl)) {
+        IcebergPartitionSpec partSpec = null;
+        if (createStmt_.getIcebergPartitionSpecs() != null &&
+           !createStmt_.getIcebergPartitionSpecs().isEmpty()) {
+          // Since this is a CREATE TABLE statement, the Iceberg table can only have
+          // a single partition spec.
+          Preconditions.checkState(createStmt_.getIcebergPartitionSpecs().size() == 1);
+          partSpec = createStmt_.getIcebergPartitionSpecs().get(0);
+        }
+        tmpTable = new IcebergCtasTarget(db, msTbl, createStmt_.getColumnDefs(),
+            partSpec);
       } else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) {
         tmpTable = db.createFsCtasTarget(msTbl);
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index c880697..21cc3d2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -360,7 +360,7 @@ public class InsertStmt extends StatementBase {
     }
 
     int numStaticPartitionExprs = 0;
-    if (partitionKeyValues_ != null) {
+    if (partitionKeyValues_ != null && !isIcebergTarget()) {
       for (PartitionKeyValue pkv: partitionKeyValues_) {
         Column column = table_.getColumn(pkv.getColName());
         if (column == null) {
@@ -498,13 +498,24 @@ public class InsertStmt extends StatementBase {
         throw new AnalysisException("PARTITION clause is not valid for INSERT into " +
             "HBase tables. '" + targetTableName_ + "' is an HBase table");
       } else {
-        if (table_ instanceof FeIcebergTable) {
-          throw new AnalysisException("PARTITION clause cannot be used for Iceberg " +
-              "tables.");
+        if (isIcebergTarget()) {
+          IcebergPartitionSpec partSpec =
+              ((FeIcebergTable)table_).getDefaultPartitionSpec();
+          if (partSpec == null || !partSpec.hasPartitionFields()) {
+            throw new AnalysisException("PARTITION clause is only valid for INSERT " +
+                "into partitioned table. '" + targetTableName_ + "' is not partitioned");
+          }
+          for (PartitionKeyValue pkv: partitionKeyValues_) {
+            if (pkv.isStatic()) {
+              throw new AnalysisException("Static partitioning is not supported for " +
+                  "Iceberg tables.");
+            }
+          }
+        } else {
+          // Unpartitioned table, but INSERT has PARTITION clause
+          throw new AnalysisException("PARTITION clause is only valid for INSERT into " +
+              "partitioned table. '" + targetTableName_ + "' is not partitioned");
         }
-        // Unpartitioned table, but INSERT has PARTITION clause
-        throw new AnalysisException("PARTITION clause is only valid for INSERT into " +
-            "partitioned table. '" + targetTableName_ + "' is not partitioned");
       }
     }
 
@@ -604,6 +615,10 @@ public class InsertStmt extends StatementBase {
     return true;
   }
 
+  private boolean isIcebergTarget() {
+    return table_ instanceof FeIcebergTable;
+  }
+
   /**
    * Checks that the column permutation + select list + static partition exprs + dynamic
    * partition exprs collectively cover exactly all required columns in the target table,
@@ -748,9 +763,8 @@ public class InsertStmt extends StatementBase {
     if (isKuduTable) {
       kuduPartitionColumnNames = getKuduPartitionColumnNames((FeKuduTable) table_);
     }
-    boolean isIcebergTable = table_ instanceof FeIcebergTable;
     IcebergPartitionSpec icebergPartSpec = null;
-    if (isIcebergTable) {
+    if (isIcebergTarget()) {
       icebergPartSpec = ((FeIcebergTable)table_).getDefaultPartitionSpec();
     }
 
@@ -799,7 +813,7 @@ public class InsertStmt extends StatementBase {
       }
     }
 
-    if (isIcebergTable) {
+    if (isIcebergTarget()) {
       // Add partition key expressions in the order of the Iceberg partition fields.
       addIcebergPartExprs(analyzer, widestTypeExprList, selectExprTargetColumns,
           selectListExprs, icebergPartSpec);
@@ -820,7 +834,7 @@ public class InsertStmt extends StatementBase {
       }
     }
 
-    if (isIcebergTable) {
+    if (isIcebergTarget()) {
       Preconditions.checkState(
           partitionKeyExprs_.size() == icebergPartSpec.getIcebergPartitionFieldsSize());
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CtasTargetTable.java b/fe/src/main/java/org/apache/impala/catalog/CtasTargetTable.java
new file mode 100644
index 0000000..7682833
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CtasTargetTable.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.AcidUtils;
+
+/**
+ * Helper class for creating CTAS target tables that can be used with Db and LocalDb
+ * as well.
+ */
+public abstract class CtasTargetTable implements FeTable {
+  protected final Table msTable_;
+  protected final FeDb db_;
+  protected final String name_;
+  protected final String owner_;
+
+  // colsByPos[i] refers to the ith column in the table. The first numClusteringCols are
+  // the clustering columns.
+  protected final List<Column> colsByPos_ = new ArrayList<>();
+
+  // map from lowercase column name to Column object.
+  protected final Map<String, Column> colsByName_ = new HashMap<>();
+
+  // Number of clustering columns.
+  protected int numClusteringCols_ = 0;
+
+  // Type of this table (array of struct) that mirrors the columns. Useful for analysis.
+  protected final ArrayType type_ = new ArrayType(new StructType());
+
+  public CtasTargetTable(org.apache.hadoop.hive.metastore.api.Table msTable, FeDb db,
+  String name, String owner) {
+    msTable_ = msTable;
+    db_ = db;
+    name_ = name;
+    owner_ = owner;
+  }
+
+  @Override
+  public boolean isLoaded() {
+    return false;
+  }
+
+  @Override
+  public Table getMetaStoreTable() {
+    return msTable_;
+  }
+
+  @Override
+  public String getStorageHandlerClassName() {
+    return null;
+  }
+
+  @Override
+  public TCatalogObjectType getCatalogObjectType() {
+    return TCatalogObjectType.TABLE;
+  }
+
+  @Override
+  public FeDb getDb() { return db_; }
+
+  @Override
+  public String getName() { return name_; }
+
+  @Override
+  public String getFullName() { return (db_ != null ? db_.getName() + "." : "") + name_; }
+
+  @Override
+  public TableName getTableName() {
+    return new TableName(db_ != null ? db_.getName() : null, name_);
+  }
+
+  @Override
+  public List<Column> getColumns() { return colsByPos_; }
+
+  @Override
+  public List<Column> getColumnsInHiveOrder() {
+    List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
+    if (getMetaStoreTable() != null &&
+        AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
+      // Remove synthetic "row__id" column.
+      Preconditions.checkState(columns.get(0).getName().equals("row__id"));
+      columns.remove(0);
+    }
+    columns.addAll(getClusteringColumns());
+    return Collections.unmodifiableList(columns);
+  }
+
+  @Override
+  public List<Column> getClusteringColumns() {
+    return Collections.unmodifiableList(colsByPos_.subList(0, numClusteringCols_));
+  }
+
+  @Override
+  public List<Column> getNonClusteringColumns() {
+    return Collections.unmodifiableList(colsByPos_.subList(numClusteringCols_,
+        colsByPos_.size()));
+  }
+
+  @Override
+  public List<String> getColumnNames() { return Column.toColumnNames(colsByPos_); }
+
+  @Override
+  public SqlConstraints getSqlConstraints() {
+    return null;
+  }
+
+  @Override
+  public int getNumClusteringCols() {
+    return numClusteringCols_;
+  }
+
+  @Override
+  public boolean isClusteringColumn(Column c) {
+      return c.getPosition() < numClusteringCols_;
+  }
+
+  @Override // FeTable
+  public Column getColumn(String name) { return colsByName_.get(name.toLowerCase()); }
+
+  @Override
+  public ArrayType getType() {
+    return type_;
+  }
+
+  @Override
+  public long getNumRows() { return 0; }
+
+  @Override
+  public TTableStats getTTableStats() { return null; }
+
+  @Override
+  public abstract TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions);
+
+  @Override
+  public long getWriteId() { return 0; }
+
+  @Override
+  public ValidWriteIdList getValidWriteIds() { return null; }
+
+  @Override
+  public String getOwnerUser() {
+    return owner_;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 359d62b..8ad21f0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -382,19 +382,24 @@ public interface FeIcebergTable extends FeFsTable {
         TableMetadata metadata) throws TableLoadingException {
       List<IcebergPartitionSpec> ret = new ArrayList<>();
       for (PartitionSpec spec : metadata.specs()) {
-        List<IcebergPartitionField> fields = new ArrayList<>();;
-        HashMap<String, Integer> transformParams =
-            IcebergUtil.getPartitionTransformParams(spec);
-        for (PartitionField field : spec.fields()) {
-          fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(),
-              spec.schema().findColumnName(field.sourceId()), field.name(),
-              IcebergUtil.getPartitionTransform(field, transformParams)));
-        }
-        ret.add(new IcebergPartitionSpec(spec.specId(), fields));
+        ret.add(convertPartitionSpec(spec));
       }
       return ret;
     }
 
+    public static IcebergPartitionSpec convertPartitionSpec(PartitionSpec spec)
+        throws TableLoadingException {
+      List<IcebergPartitionField> fields = new ArrayList<>();;
+      HashMap<String, Integer> transformParams =
+          IcebergUtil.getPartitionTransformParams(spec);
+      for (PartitionField field : spec.fields()) {
+        fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(),
+            spec.schema().findColumnName(field.sourceId()), field.name(),
+            IcebergUtil.getPartitionTransform(field, transformParams)));
+      }
+      return new IcebergPartitionSpec(spec.specId(), fields);
+    }
+
     public static IcebergPartitionSpec getDefaultPartitionSpec(
         FeIcebergTable feIcebergTable) {
       List<IcebergPartitionSpec> specs = feIcebergTable.getPartitionSpecs();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index a647423..77eee52 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2174,7 +2174,7 @@ public class HdfsTable extends Table implements FeFsTable {
    *  (e.g. number of blocks, files, etc) in order to compute an estimate of the metadata
    *  size of this table.
    */
-  protected THdfsTable getTHdfsTable(ThriftObjectType type, Set<Long> refPartitions) {
+  public THdfsTable getTHdfsTable(ThriftObjectType type, Set<Long> refPartitions) {
     if (type == ThriftObjectType.FULL) {
       // "full" implies all partitions should be included.
       Preconditions.checkArgument(refPartitions == null);
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
new file mode 100644
index 0000000..cd9c8ec
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.IcebergPartitionSpec;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogObject;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.CtasTargetTable;
+import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.FeCatalogUtils;
+import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeFsPartition;
+import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.IcebergColumn;
+import org.apache.impala.catalog.IcebergStructField;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.local.LocalDb;
+import org.apache.impala.catalog.local.LocalFsTable;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.thrift.CatalogObjectsConstants;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.THdfsPartition;
+import org.apache.impala.thrift.THdfsTable;
+import org.apache.impala.thrift.TIcebergCatalog;
+import org.apache.impala.thrift.TIcebergFileFormat;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.IcebergSchemaConverter;
+import org.apache.impala.util.IcebergUtil;
+
+/**
+ * Utility class that can be used as a temporary target table for CTAS statements.
+ * It mimics an FeIcebergTable without actually creating it via Iceberg.
+ */
+public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable {
+  private FeFsTable fsTable_;
+  private Schema iceSchema_;
+  private List<IcebergPartitionSpec> partitionSpecs_ = new ArrayList<>();
+  private TIcebergFileFormat icebergFileFormat_;
+  private TIcebergCatalog icebergCatalog_;
+  private String icebergTableLocation_;
+  private String icebergCatalogLocation_;
+  private HdfsStorageDescriptor hdfsSd_;
+
+  public IcebergCtasTarget(FeDb db, org.apache.hadoop.hive.metastore.api.Table msTbl,
+      List<ColumnDef> columnDefs, IcebergPartitionSpec partSpec)
+      throws CatalogException {
+    super(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
+    createFsTable(db, msTbl);
+    createIcebergSchema(columnDefs);
+    createPartitionSpec(partSpec);
+    icebergCatalog_ = IcebergUtil.getTIcebergCatalog(msTbl);
+    setLocations();
+    icebergFileFormat_ = Utils.getIcebergFileFormat(msTbl);
+    hdfsSd_ = HdfsStorageDescriptor.fromStorageDescriptor(name_, msTable_.getSd());
+  }
+
+  private void createIcebergSchema(List<ColumnDef> columnDefs) throws CatalogException {
+    List<TColumn> tcols = new ArrayList<>();
+    for (ColumnDef col : columnDefs) {
+      tcols.add(col.toThrift());
+    }
+    try {
+      iceSchema_ = IcebergSchemaConverter.genIcebergSchema(tcols);
+      // In genIcebergSchema() we did our best to assign correct field ids to columns,
+      // but to be sure, let's use Iceberg's API function to assign field ids.
+      iceSchema_ = TypeUtil.assignIncreasingFreshIds(iceSchema_);
+    } catch (ImpalaRuntimeException ex) {
+      throw new CatalogException(
+        "Exception caught during generating Iceberg schema:", ex);
+    }
+    for (Column col : IcebergSchemaConverter.convertToImpalaSchema(iceSchema_)) {
+      addColumn((IcebergColumn)col);
+    }
+  }
+
+  private void createPartitionSpec(IcebergPartitionSpec partSpec)
+      throws CatalogException {
+    Preconditions.checkState(iceSchema_ != null);
+    PartitionSpec iceSpec = null;
+    try {
+      // Let's create an Iceberg PartitionSpec with the help of Icebeg from 'partSpec',
+      // then convert it back to an IcebergPartitionSpec.
+      if (partSpec == null) {
+        iceSpec = PartitionSpec.unpartitioned();
+      } else {
+        iceSpec = IcebergUtil.createIcebergPartition(iceSchema_, partSpec.toThrift());
+      }
+    } catch (ImpalaRuntimeException ex) {
+      throw new CatalogException(
+        "Exception caught during generating Iceberg schema:", ex);
+    }
+    IcebergPartitionSpec resolvedIcebergSpec =
+        FeIcebergTable.Utils.convertPartitionSpec(iceSpec);
+    partitionSpecs_.add(resolvedIcebergSpec);
+  }
+
+  private void setLocations() {
+    Preconditions.checkState(msTable_ != null);
+    Preconditions.checkState(icebergCatalog_ != null);
+    if (icebergCatalog_ == TIcebergCatalog.HADOOP_CATALOG) {
+      icebergCatalogLocation_ = IcebergUtil.getIcebergCatalogLocation(msTable_);
+      TableIdentifier tId = IcebergUtil.getIcebergTableIdentifier(msTable_);
+      Namespace ns = tId.namespace();
+      List<String> components = new ArrayList<>();
+      Collections.addAll(components, ns.levels());
+      components.add(tId.name());
+      icebergTableLocation_ =
+          icebergCatalogLocation_ + "/" + String.join("/", components);
+      return;
+    }
+    Preconditions.checkState(icebergCatalog_ == TIcebergCatalog.HADOOP_TABLES ||
+                             icebergCatalog_ == TIcebergCatalog.HIVE_CATALOG);
+    icebergTableLocation_ = msTable_.getSd().getLocation();
+    icebergCatalogLocation_ = icebergTableLocation_;
+  }
+
+  private void createFsTable(FeDb db, org.apache.hadoop.hive.metastore.api.Table msTbl)
+      throws CatalogException {
+    if (db instanceof Db) {
+      fsTable_ = HdfsTable.createCtasTarget((Db)db, msTbl);
+    } else {
+      fsTable_ = LocalFsTable.createCtasTarget((LocalDb)db, msTbl);
+    }
+  }
+
+  @Override
+  public Map<String, FileDescriptor> getPathHashToFileDescMap() {
+    return Collections.<String, FileDescriptor>emptyMap();
+  }
+
+  @Override
+  public FeFsTable getFeFsTable() {
+    return fsTable_;
+  }
+
+  @Override
+  public TIcebergCatalog getIcebergCatalog() {
+    return icebergCatalog_;
+  }
+
+  @Override
+  public String getIcebergCatalogLocation() {
+    return icebergCatalogLocation_;
+  }
+
+  @Override
+  public TIcebergFileFormat getIcebergFileFormat() {
+    return icebergFileFormat_;
+  }
+
+  @Override
+  public String getIcebergTableLocation() {
+    return icebergTableLocation_;
+  }
+
+  @Override
+  public List<IcebergPartitionSpec> getPartitionSpecs() {
+    return partitionSpecs_;
+  }
+
+  @Override
+  public IcebergPartitionSpec getDefaultPartitionSpec() {
+    return partitionSpecs_.get(0);
+  }
+
+  @Override
+  public int getDefaultPartitionSpecId() {
+    return 0;
+  }
+
+  @Override
+  public Schema getIcebergSchema() {
+    return iceSchema_;
+  }
+
+  @Override
+  public long snapshotId() {
+    return -1;
+  }
+
+  public void addColumn(IcebergColumn col) {
+    colsByPos_.add(col);
+    colsByName_.put(col.getName().toLowerCase(), col);
+    ((StructType) type_.getItemType()).addField(
+        new IcebergStructField(col.getName(), col.getType(), col.getComment(),
+            col.getFieldId()));
+  }
+
+  @Override
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
+    TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.ICEBERG_TABLE,
+        FeCatalogUtils.getTColumnDescriptors(this),
+        getNumClusteringCols(),
+        getName(), db_.getName());
+
+    desc.setIcebergTable(Utils.getTIcebergTable(this));
+    desc.setHdfsTable(transformToTHdfsTable());
+    return desc;
+  }
+
+  private THdfsTable transformToTHdfsTable() {
+    if (fsTable_ instanceof HdfsTable) {
+      return transformOldToTHdfsTable();
+    } else {
+      return transformLocalToTHdfsTable();
+    }
+  }
+
+  private THdfsTable transformOldToTHdfsTable() {
+    THdfsTable hdfsTable = ((HdfsTable)fsTable_).getTHdfsTable(
+        CatalogObject.ThriftObjectType.FULL, null);
+    hdfsTable.setPrototype_partition(createPrototypePartition());
+    return hdfsTable;
+  }
+
+  private THdfsTable transformLocalToTHdfsTable() {
+    LocalFsTable localFsTable = (LocalFsTable)fsTable_;
+    Map<Long, THdfsPartition> idToPartition = new HashMap<>();
+    THdfsPartition tPrototypePartition = createPrototypePartition();
+    return new THdfsTable(localFsTable.getHdfsBaseDir(),
+        getColumnNames(), localFsTable.getNullPartitionKeyValue(),
+        FeFsTable.DEFAULT_NULL_COLUMN_VALUE, idToPartition, tPrototypePartition);
+  }
+
+  private THdfsPartition createPrototypePartition() {
+    THdfsPartition prototypePart = new THdfsPartition();
+    prototypePart.setFileFormat(IcebergUtil.toTHdfsFileFormat(icebergFileFormat_));
+    prototypePart.setBlockSize(hdfsSd_.getBlockSize());
+    prototypePart.setId(CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
+    return prototypePart;
+  }
+
+  @Override
+  public TCatalogObjectType getCatalogObjectType() {
+    return TCatalogObjectType.TABLE;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index b972ca8..39b513e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -54,7 +54,7 @@ import com.google.common.collect.Maps;
  * This class is not thread-safe. A new instance is created for
  * each catalog instance.
  */
-class LocalDb implements FeDb {
+public class LocalDb implements FeDb {
   private final LocalCatalog catalog_;
   /** The lower-case name of the database. */
   private final String name_;
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 12da212..956b964 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -369,7 +369,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     return false;
   }
 
-  protected LocalFsPartition createPrototypePartition() {
+  public LocalFsPartition createPrototypePartition() {
     Partition protoMsPartition = new Partition();
 
     // The prototype partition should not have a location set in its storage
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index c4d0259..370f84e 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -69,7 +69,8 @@ public class IcebergCatalogOpExecutor {
       String location, TCreateTableParams params) throws ImpalaRuntimeException {
     // Each table id increase from zero
     Schema schema = createIcebergSchema(params);
-    PartitionSpec spec = IcebergUtil.createIcebergPartition(schema, params);
+    PartitionSpec spec = IcebergUtil.createIcebergPartition(schema,
+        params.getPartition_spec());
     IcebergCatalog icebergCatalog = IcebergUtil.getIcebergCatalog(catalog, location);
     Table iceTable = icebergCatalog.createTable(identifier, schema, spec, location,
         params.getTable_properties());
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
index 3fdfd2e..131a7f0 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
@@ -149,7 +149,7 @@ public class IcebergSchemaConverter {
    */
   public static Schema genIcebergSchema(List<TColumn> columns)
       throws ImpalaRuntimeException {
-    iThreadLocal.set(0);
+    iThreadLocal.set(1);
     List<Types.NestedField> fields = new ArrayList<Types.NestedField>();
     for (TColumn column : columns) {
       fields.add(createIcebergField(column));
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index f1fdb87..5be0a9f 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -72,6 +72,7 @@ import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionField;
+import org.apache.impala.thrift.TIcebergPartitionSpec;
 import org.apache.impala.thrift.TIcebergPartitionTransform;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
 
@@ -178,17 +179,15 @@ public class IcebergUtil {
   }
 
   /**
-   * Build iceberg PartitionSpec by parameters.
+   * Build iceberg PartitionSpec from TIcebergPartitionSpec.
    * partition columns are all from source columns, this is different from hdfs table.
    */
   public static PartitionSpec createIcebergPartition(Schema schema,
-      TCreateTableParams params) throws ImpalaRuntimeException {
-    if (params.getPartition_spec() == null) {
-      return PartitionSpec.unpartitioned();
-    }
+      TIcebergPartitionSpec partSpec) throws ImpalaRuntimeException {
+    if (partSpec == null) return PartitionSpec.unpartitioned();
+
+    List<TIcebergPartitionField> partitionFields = partSpec.getPartition_fields();
     PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
-    List<TIcebergPartitionField> partitionFields =
-        params.getPartition_spec().getPartition_fields();
     for (TIcebergPartitionField partitionField : partitionFields) {
       TIcebergPartitionTransformType transformType =
           partitionField.getTransform().getTransform_type();
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 1bb2fc8..fd6570d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2119,10 +2119,10 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Unsupported file formats
     AnalysisError("create table foo stored as sequencefile as select 1",
         "CREATE TABLE AS SELECT does not support the (SEQUENCEFILE) file format. " +
-         "Supported formats are: (PARQUET, TEXTFILE, KUDU)");
+         "Supported formats are: (PARQUET, TEXTFILE, KUDU, ICEBERG)");
     AnalysisError("create table foo stored as RCFILE as select 1",
         "CREATE TABLE AS SELECT does not support the (RCFILE) file format. " +
-         "Supported formats are: (PARQUET, TEXTFILE, KUDU)");
+         "Supported formats are: (PARQUET, TEXTFILE, KUDU, ICEBERG)");
 
     // CTAS with a WITH clause and inline view (IMPALA-1100)
     AnalyzesOk("create table test_with as with with_1 as (select 1 as int_col from " +
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-ctas.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-ctas.test
new file mode 100644
index 0000000..eddac95
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-ctas.test
@@ -0,0 +1,105 @@
+====
+---- QUERY
+CREATE TABLE value_tbl (t TINYINT, i INT, b BIGINT, s STRING, ts TIMESTAMP, d date);
+insert into value_tbl values (0, 1, 2, 'impala', '2021-02-26 16:16:59', '2021-02-26');
+====
+---- QUERY
+# Test non-partitioned table in Iceberg HiveCatalog.
+CREATE TABLE ice_ctas STORED AS ICEBERG AS SELECT i, b FROM value_tbl;
+SELECT * FROM ice_ctas;
+---- RESULTS
+1,2
+---- TYPES
+INT,BIGINT
+====
+---- QUERY
+show files in ice_ctas;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_ctas/data/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Test partitioned table in Iceberg HiveCatalog.
+# Use old PARTITIONED BY syntax.
+CREATE TABLE ice_ctas_part PARTITIONED BY(d) STORED AS ICEBERG AS SELECT s, ts, d FROM value_tbl;
+SELECT * FROM ice_ctas_part where d='2021-02-26';
+---- RESULTS
+'impala',2021-02-26 16:16:59,2021-02-26
+---- TYPES
+STRING,TIMESTAMP,DATE
+====
+---- QUERY
+show files in ice_ctas_part;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_ctas_part/data/d=2021-02-26/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# INSERT an additional row to a different partition.
+INSERT INTO ice_ctas_part VALUES ('fox','2021-02-27 16:16:59','2021-02-27');
+SELECT * FROM ice_ctas_part;
+---- RESULTS
+'impala',2021-02-26 16:16:59,2021-02-26
+'fox',2021-02-27 16:16:59,2021-02-27
+---- TYPES
+STRING,TIMESTAMP,DATE
+====
+---- QUERY
+show files in ice_ctas_part;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_ctas_part/data/d=2021-02-26/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_ctas_part/data/d=2021-02-27/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Test CTAS in Iceberg HadoopTables catalog.
+# Set table location to custom location.
+# Use PARTITION BY SPEC
+CREATE TABLE ice_ctas_hadoop_tables_part PARTITION BY SPEC (d month)
+STORED AS ICEBERG
+LOCATION '/test-warehouse/$DATABASE.db/loc_test'
+TBLPROPERTIES ('iceberg.catalog'='hadoop.tables') AS SELECT s, ts, d FROM value_tbl;
+SELECT * FROM ice_ctas_hadoop_tables_part where d='2021-02-26';
+---- RESULTS
+'impala',2021-02-26 16:16:59,2021-02-26
+---- TYPES
+STRING,TIMESTAMP,DATE
+====
+---- QUERY
+show files in ice_ctas_hadoop_tables_part;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/loc_test/data/d_month=2021-02/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Test CTAS in Iceberg HadoopCatalog catalog.
+# Set 'iceberg.catalog_location' and 'iceberg.table_identifier'
+# Partition by TRUNCATE
+# Cast TINYINT to INT.
+# INSERT additional row.
+CREATE TABLE ice_ctas_hadoop_catalog_part PARTITION BY SPEC (s truncate 3)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
+               'iceberg.catalog_location'='/test-warehouse/$DATABASE.db/cat_loc',
+               'iceberg.table_identifier'='ns1.ns2.ctas')
+AS SELECT cast(t as INT), s, d FROM value_tbl;
+INSERT INTO ice_ctas_hadoop_catalog_part VALUES (1, 'lion', '2021-02-27');
+SELECT * FROM ice_ctas_hadoop_catalog_part;
+---- RESULTS
+0,'impala',2021-02-26
+1,'lion',2021-02-27
+---- TYPES
+INT,STRING,DATE
+====
+---- QUERY
+show files in ice_ctas_hadoop_catalog_part;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/cat_loc/ns1/ns2/ctas/data/s_trunc=imp/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/cat_loc/ns1/ns2/ctas/data/s_trunc=lio/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index bd9d033..38a61a6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -93,14 +93,6 @@ SHOW CREATE TABLE fake_iceberg_table_hadoop_catalog;
 row_regex:.*CAUSED BY: TableLoadingException: Table does not exist: fake_db.fake_table*
 ====
 ---- QUERY
-CREATE TABLE iceberg_ctas
-STORED AS ICEBERG
-TBLPROPERTIES('iceberg.catalog'='hadoop.tables')
-AS SELECT id FROM functional.alltypes;
----- CATCH
-AnalysisException: CREATE TABLE AS SELECT does not support the (ICEBERG) file format.
-====
----- QUERY
 CREATE TABLE iceberg_overwrite_bucket (i int)
 PARTITION BY SPEC (i bucket 3)
 STORED AS ICEBERG
@@ -159,7 +151,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ---- QUERY
 INSERT INTO iceberg_partitioned_insert PARTITION(level='level') VALUES(now());
 ---- CATCH
-AnalysisException: PARTITION clause cannot be used for Iceberg tables.
+Static partitioning is not supported for Iceberg tables.
 ====
 ---- QUERY
 CREATE TABLE all_colss_needed_for_insert (i int, j int, k int)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 710ebb3..6b2591a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -808,3 +808,42 @@ LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
 'iceberg.catalog'='hive.catalog', 'table_type'='ICEBERG')
 ====
+---- CREATE_TABLE
+CREATE TABLE iceberg_ctas
+PARTITION BY SPEC(id bucket 5)
+STORED AS ICEBERG
+AS SELECT id, bool_col, bigint_col FROM functional.alltypes;
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_ctas (
+  id INT NULL,
+  bool_col BOOLEAN NULL,
+  bigint_col BIGINT NULL
+)
+PARTITION BY SPEC (
+  id BUCKET 5
+)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+'iceberg.catalog'='hive.catalog', 'table_type'='ICEBERG')
+====
+---- CREATE_TABLE
+CREATE TABLE iceberg_ctas_ht
+PARTITION BY SPEC(id bucket 5)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.catalog'='hadoop.tables')
+AS SELECT id, bool_col, bigint_col FROM functional.alltypes;
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_ctas_ht (
+  id INT NULL,
+  bool_col BOOLEAN NULL,
+  bigint_col BIGINT NULL
+)
+PARTITION BY SPEC (
+  id BUCKET 5
+)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+'iceberg.catalog'='hadoop.tables')
+====
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index 44f7fbf..8e82974 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -39,7 +39,7 @@ class TestShowCreateTable(ImpalaTestSuite):
                            "STATS_GENERATED_VIA_STATS_TASK", "last_modified_by",
                            "last_modified_time", "numFilesErasureCoded",
                            "bucketing_version", "OBJCAPABILITIES",
-                           "TRANSLATED_TO_EXTERNAL"]
+                           "TRANSLATED_TO_EXTERNAL", "previous_metadata_location"]
 
   @classmethod
   def get_workload(self):
@@ -169,6 +169,9 @@ class TestShowCreateTable(ImpalaTestSuite):
     # is not valid
     s = re.sub("TBLPROPERTIES\s*\(\s*\)", "", s)
     s = re.sub("SERDEPROPERTIES\s*\(\s*\)", "", s)
+    # By removing properties in the middle we might ended up having extra whitespaces,
+    # let's remove them.
+    s = ' '.join(s.split())
     return s
 
   def __properties_map_regex(self, name):
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 19177af..20c62c8 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -76,6 +76,9 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_insert_overwrite(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-overwrite', vector, use_db=unique_database)
 
+  def test_ctas(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-ctas', vector, use_db=unique_database)
+
   def test_partition_transform_insert(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-partition-transform-insert', vector,
         use_db=unique_database)