You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/08/30 12:08:02 UTC

[impala] branch branch-4.1.1 updated (44dc157a2 -> 5cae46a4b)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 44dc157a2 IMPALA-11344: Missing slots in all cases should be allowed to be read
     new 1a0b39442 IMPALA-11034: Resolve schema of old data files in migrated Iceberg tables
     new b5c3c91ea IMPALA-10865: Fix initialize SelectStmt's groupingExprs_ in analyzeGroupingExprs
     new 15fd47ed1 IMPALA-11457 Fix regression with unknown disk id
     new 39af39f09 IMPALA-11464: Skip listing staging dirs to avoid failures on them
     new 14b2d414f IMPALA-11401,IMPALA-10794: Add logs and thread names for catalogd RPCs
     new 5cae46a4b IMPALA-11281: Load table metadata for ResetMetadataStmt

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/orc-metadata-utils.cc                  |  63 ++++-
 be/src/exec/orc-metadata-utils.h                   |   9 +-
 be/src/exec/parquet/parquet-metadata-utils.cc      |  69 ++++-
 be/src/exec/parquet/parquet-metadata-utils.h       |  27 +-
 common/fbs/CatalogObjects.fbs                      |   4 +-
 .../org/apache/impala/analysis/ColumnName.java     |   6 +
 .../org/apache/impala/analysis/FunctionName.java   |   4 +
 .../apache/impala/analysis/ResetMetadataStmt.java  |   5 +
 .../org/apache/impala/analysis/SelectStmt.java     |  15 +-
 .../apache/impala/analysis/StmtMetadataLoader.java |  12 +-
 .../java/org/apache/impala/analysis/TableName.java |   4 +
 .../impala/catalog/local/CatalogdMetaProvider.java |   4 +-
 .../org/apache/impala/common/FileSystemUtil.java   |  21 +-
 .../ExtractCompoundVerticalBarExprRule.java        |   6 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  11 +-
 .../java/org/apache/impala/service/JniCatalog.java | 146 ++++++++---
 .../java/org/apache/impala/util/CatalogOpUtil.java | 148 +++++++++++
 .../java/org/apache/impala/util/DebugUtils.java    |   4 +
 .../org/apache/impala/util/CatalogOpUtilTest.java  | 221 ++++++++++++++++
 testdata/data/README                               |  28 +++
 .../iceberg_migrated_alter_test/000000_0           | Bin 0 -> 817 bytes
 .../c9f83a82-60f4-443b-9ca4-359cad16fe12-m0.avro}  | Bin 3034 -> 3182 bytes
 ...396-1-c9f83a82-60f4-443b-9ca4-359cad16fe12.avro | Bin 0 -> 1986 bytes
 .../metadata/v1.metadata.json                      |  59 +++--
 .../metadata/v2.metadata.json                      | 105 ++++++++
 .../metadata/version-hint.text                     |   0
 .../iceberg_migrated_alter_test_orc/000000_0       | Bin 0 -> 418 bytes
 .../340a3b82-71e3-4f50-b030-aecb5a5ea730-m0.avro}  | Bin 3034 -> 3182 bytes
 ...038-1-340a3b82-71e3-4f50-b030-aecb5a5ea730.avro | Bin 0 -> 1990 bytes
 .../metadata/v1.metadata.json                      |  59 +++--
 .../metadata/v2.metadata.json                      | 105 ++++++++
 .../metadata/version-hint.text                     |   0
 .../iceberg_migrated_complex_test/000000_0         | Bin 0 -> 3006 bytes
 .../152e384f-2851-44b7-9ada-1bfbec74e9fc-m0.avro}  | Bin 3034 -> 3218 bytes
 ...148-1-152e384f-2851-44b7-9ada-1bfbec74e9fc.avro | Bin 0 -> 1988 bytes
 .../metadata/v1.metadata.json                      | 255 +++++++++++++++++++
 .../metadata/v2.metadata.json                      | 279 +++++++++++++++++++++
 .../metadata/version-hint.text                     |   0
 .../iceberg_migrated_complex_test_orc/000000_0     | Bin 0 -> 1217 bytes
 .../8588fd4b-13c1-4451-80ad-5cf71a959b94-m0.avro}  | Bin 3034 -> 3232 bytes
 ...504-1-8588fd4b-13c1-4451-80ad-5cf71a959b94.avro | Bin 0 -> 1992 bytes
 .../metadata/v1.metadata.json                      | 255 +++++++++++++++++++
 .../metadata/v2.metadata.json                      | 279 +++++++++++++++++++++
 .../metadata/version-hint.text                     |   0
 .../queries/PlannerTest/aggregation.test           |  23 +-
 ...iceberg-migrated-table-field-id-resolution.test | 208 +++++++++++++++
 tests/authorization/test_ranger.py                 |  29 +++
 tests/common/file_utils.py                         |  34 +++
 tests/metadata/test_recursive_listing.py           | 116 ++++++++-
 tests/query_test/test_iceberg.py                   |  13 +
 tests/util/filesystem_base.py                      |   7 +
 tests/util/hdfs_util.py                            |  12 +
 52 files changed, 2506 insertions(+), 139 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java
 create mode 100644 fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_alter_test/000000_0
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/airports_orc/metadata/1ebf435e-7da7-41e7-bebf-eb3ebf1b1002-m0.avro => iceberg_migrated_alter_test/metadata/c9f83a82-60f4-443b-9ca4-359cad16fe12-m0.avro} (86%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/snap-2941076094076108396-1-c9f83a82-60f4-443b-9ca4-359cad16fe12.avro
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/airports_parquet => iceberg_migrated_alter_test}/metadata/v1.metadata.json (52%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/v2.metadata.json
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/complextypestbl_iceberg_orc => iceberg_migrated_alter_test}/metadata/version-hint.text (100%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/000000_0
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/airports_orc/metadata/1ebf435e-7da7-41e7-bebf-eb3ebf1b1002-m0.avro => iceberg_migrated_alter_test_orc/metadata/340a3b82-71e3-4f50-b030-aecb5a5ea730-m0.avro} (86%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/snap-2205107170480729038-1-340a3b82-71e3-4f50-b030-aecb5a5ea730.avro
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/airports_orc => iceberg_migrated_alter_test_orc}/metadata/v1.metadata.json (51%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/v2.metadata.json
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/complextypestbl_iceberg_orc => iceberg_migrated_alter_test_orc}/metadata/version-hint.text (100%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test/000000_0
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/airports_orc/metadata/1ebf435e-7da7-41e7-bebf-eb3ebf1b1002-m0.avro => iceberg_migrated_complex_test/metadata/152e384f-2851-44b7-9ada-1bfbec74e9fc-m0.avro} (85%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/snap-3911840040574896148-1-152e384f-2851-44b7-9ada-1bfbec74e9fc.avro
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v1.metadata.json
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v2.metadata.json
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/complextypestbl_iceberg_orc => iceberg_migrated_complex_test}/metadata/version-hint.text (100%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/000000_0
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/airports_orc/metadata/1ebf435e-7da7-41e7-bebf-eb3ebf1b1002-m0.avro => iceberg_migrated_complex_test_orc/metadata/8588fd4b-13c1-4451-80ad-5cf71a959b94-m0.avro} (85%)
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/snap-3622599918649152504-1-8588fd4b-13c1-4451-80ad-5cf71a959b94.avro
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v1.metadata.json
 create mode 100644 testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v2.metadata.json
 copy testdata/data/iceberg_test/{hadoop_catalog/ice/complextypestbl_iceberg_orc => iceberg_migrated_complex_test_orc}/metadata/version-hint.text (100%)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-table-field-id-resolution.test


[impala] 05/06: IMPALA-11401,IMPALA-10794: Add logs and thread names for catalogd RPCs

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 14b2d414f44da74335c60e6c38f173e71eea20d9
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Jul 22 15:34:05 2022 +0800

    IMPALA-11401,IMPALA-10794: Add logs and thread names for catalogd RPCs
    
    We've seen catalogd throws OutOfMemoryError when serializing large
    responses (i.e. size > 2GB). However, the related table names are
    missing in the logs. Admins would like to get the table names and
    blacklist those tables until they are optimized (e.g. reducing
    partitions).
    
    To improve the supportability, this patch adds logs in the Catalogd RPC
    code paths to log some details of the request, also add thread
    annotations to improve readability of jstacks.
    
    Tests:
     - Add unit tests for short descriptions of requests.
     - Manually add codes to throw OutOfMemoryError and verify the logs
       shown as expected.
     - Run test_concurrent_ddls.py and metadata tests. Capture jstacks and
       verify the thread annotations are shown.
     - Run CORE tests
    
    Change-Id: Iac7f2eda8b95643a3d3c3bef64ea71b67b20595a
    Reviewed-on: http://gerrit.cloudera.org:8080/18772
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18916
    Tested-by: Quanlong Huang <hu...@gmail.com>
---
 .../org/apache/impala/analysis/ColumnName.java     |   6 +
 .../org/apache/impala/analysis/FunctionName.java   |   4 +
 .../apache/impala/analysis/ResetMetadataStmt.java  |   5 +
 .../java/org/apache/impala/analysis/TableName.java |   4 +
 .../impala/catalog/local/CatalogdMetaProvider.java |   4 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  11 +-
 .../java/org/apache/impala/service/JniCatalog.java | 146 +++++++++++---
 .../java/org/apache/impala/util/CatalogOpUtil.java | 148 ++++++++++++++
 .../org/apache/impala/util/CatalogOpUtilTest.java  | 221 +++++++++++++++++++++
 9 files changed, 513 insertions(+), 36 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnName.java b/fe/src/main/java/org/apache/impala/analysis/ColumnName.java
index 8eb285bce..41d50a1ab 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnName.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnName.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import com.google.common.base.Preconditions;
+import org.apache.impala.thrift.TColumnName;
 
 /**
  * Represents a column name that optionally includes its database name.
@@ -36,4 +37,9 @@ public class ColumnName {
   public TableName getTableName() { return tableName_; }
 
   public String getColumnName() { return columnName_; }
+
+  public static String thriftToString(TColumnName colName) {
+    return TableName.thriftToString(colName.getTable_name()) + "." +
+        colName.getColumn_name();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionName.java b/fe/src/main/java/org/apache/impala/analysis/FunctionName.java
index 288f7cb6e..b3093c0c1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionName.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionName.java
@@ -161,4 +161,8 @@ public class FunctionName {
   public static FunctionName fromThrift(TFunctionName fnName) {
     return new FunctionName(fnName.getDb_name(), fnName.getFunction_name());
   }
+
+  public static String thriftToString(TFunctionName fnName) {
+    return fromThrift(fnName).toString();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index c892a2e65..87d4629f7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -126,6 +126,11 @@ public class ResetMetadataStmt extends StatementBase {
   @VisibleForTesting
   protected Action getAction() { return action_; }
 
+  @VisibleForTesting
+  public void setRequestingUser(User user) {
+    requestingUser_ = user;
+  }
+
   @Override
   public void collectTableRefs(List<TableRef> tblRefs) {
     if (tableName_ != null && partitionSpec_ != null) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableName.java b/fe/src/main/java/org/apache/impala/analysis/TableName.java
index ae6426e33..e413cca2b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableName.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableName.java
@@ -126,6 +126,10 @@ public class TableName {
     return new TableName(tableName.getDb_name(), tableName.getTable_name());
   }
 
+  public static String thriftToString(TTableName tableName) {
+    return fromThrift(tableName).toString();
+  }
+
   public TTableName toThrift() { return new TTableName(db_, tbl_); }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 794124f44..4c4ec1091 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -97,6 +97,7 @@ import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
+import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -511,7 +512,8 @@ public class CatalogdMetaProvider implements MetaProvider {
     Stopwatch sw = Stopwatch.createStarted();
     boolean hit = false;
     boolean isPiggybacked = false;
-    try {
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+        "LoadWithCaching for " + itemString)) {
       CompletableFuture<Object> f = new CompletableFuture<Object>();
       // NOTE: the Cache ensures that this is an atomic operation of either returning
       // an existing value or inserting our own. Only one thread can think it is the
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 76ac5e6f7..43c9a50a1 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -230,6 +230,7 @@ import org.apache.impala.thrift.TUpdateCatalogResponse;
 import org.apache.impala.thrift.TUpdatedPartition;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.HdfsCachingUtil;
@@ -237,6 +238,7 @@ import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo;
+import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -5458,7 +5460,10 @@ public class CatalogOpExecutor {
 
     // Add partitions to metastore.
     Map<String, Long> partitionToEventId = Maps.newHashMap();
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+    String annotation = String.format("Recovering %d partitions for %s",
+        hmsPartitions.size(), tbl.getFullName());
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);
+        MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       List<Partition> addedPartitions = addHmsPartitions(msClient, tbl, hmsPartitions,
           partitionToEventId, true);
       addHdfsPartitions(msClient, tbl, addedPartitions, partitionToEventId);
@@ -5997,9 +6002,7 @@ public class CatalogOpExecutor {
    */
   public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
       throws CatalogException {
-    String cmdString = String.format("%s issued by %s",
-        req.is_refresh ? "REFRESH":"INVALIDATE",
-        req.header != null ? req.header.requesting_user : " unknown user");
+    String cmdString = CatalogOpUtil.getShortDescForReset(req);
     TResetMetadataResponse resp = new TResetMetadataResponse();
     resp.setResult(new TCatalogUpdateResult());
     resp.getResult().setCatalog_service_id(JniCatalog.getServiceId());
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index e8f2951f4..126d08f63 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.AuthorizationManager;
+import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Db;
@@ -46,6 +47,7 @@ import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
@@ -78,8 +80,10 @@ import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.AuthorizationUtil;
+import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.PatternMatcher;
+import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -219,11 +223,20 @@ public class JniCatalog {
     long start = System.currentTimeMillis();
     TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq);
-    byte[] res = new TSerializer(protocolFactory_).serialize(new TGetCatalogDeltaResponse(
-        catalog_.getCatalogDelta(params.getNative_catalog_server_ptr(),
-        params.getFrom_version())));
-    JniUtil.logResponse(res.length, start, params, "getCatalogDelta");
-    return res;
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    String shortDesc = "getting catalog delta from version " + params.getFrom_version();
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
+      byte[] res = serializer.serialize(new TGetCatalogDeltaResponse(
+          catalog_.getCatalogDelta(params.getNative_catalog_server_ptr(),
+              params.getFrom_version())));
+      JniUtil.logResponse(res.length, start, params, "getCatalogDelta");
+      return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in {}. Time spent: {}.",
+          shortDesc, PrintUtils.printTimeMs(duration));
+      throw e;
+    }
   }
 
   /**
@@ -236,17 +249,25 @@ public class JniCatalog {
   /**
    * Executes the given DDL request and returns the result.
    */
-  public byte[] execDdl(byte[] thriftDdlExecReq) throws ImpalaException {
+  public byte[] execDdl(byte[] thriftDdlExecReq) throws ImpalaException, TException {
     long start = System.currentTimeMillis();
     TDdlExecRequest params = new TDdlExecRequest();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDdlExecReq);
     TSerializer serializer = new TSerializer(protocolFactory_);
-    try {
+    String shortDesc = CatalogOpUtil.getShortDescForExecDdl(params);
+    LOG.info("execDdl request: " + shortDesc);
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
       byte[] res = serializer.serialize(catalogOpExecutor_.execDdlRequest(params));
       JniUtil.logResponse(res.length, start, params, "execDdl");
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("finished execDdl request: {}. Time spent: {}",
+          shortDesc, PrintUtils.printTimeMs(duration));
       return res;
-    } catch (TException e) {
-      throw new InternalException(e.getMessage());
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in execDdl for {}. Time spent: {}.",
+          shortDesc, PrintUtils.printTimeMs(duration));
+      throw e;
     }
   }
 
@@ -260,10 +281,20 @@ public class JniCatalog {
     JniUtil.deserializeThrift(protocolFactory_, req, thriftResetMetadataReq);
     TSerializer serializer = new TSerializer(protocolFactory_);
     catalogOperationUsage.increment(req);
-    try {
+    String shortDesc = CatalogOpUtil.getShortDescForReset(req);
+    LOG.info("resetMetadata request: " + shortDesc);
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
       byte[] res = serializer.serialize(catalogOpExecutor_.execResetMetadata(req));
       JniUtil.logResponse(res.length, start, req, "resetMetadata");
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("finished resetMetadata request: {}. Time spent: {}",
+          shortDesc, PrintUtils.printTimeMs(duration));
       return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in resetMetadata for {}. Time spent: {}.",
+          shortDesc, PrintUtils.printTimeMs(duration));
+      throw e;
     } finally {
       catalogOperationUsage.decrement(req);
     }
@@ -317,9 +348,17 @@ public class JniCatalog {
     long start = System.currentTimeMillis();
     TGetTableMetricsParams params = new TGetTableMetricsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, getTableMetricsParams);
-    String res = catalog_.getTableMetrics(params.table_name);
-    JniUtil.logResponse(res.length(), start, params, "getTableMetrics");
-    return res;
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+        "getTableMetrics " + params.table_name)) {
+      String res = catalog_.getTableMetrics(params.table_name);
+      JniUtil.logResponse(res.length(), start, params, "getTableMetrics");
+      return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in getTableMetrics {}. Time spent: {}.", params.table_name,
+          PrintUtils.printTimeMs(duration));
+      throw e;
+    }
   }
 
   /**
@@ -328,12 +367,21 @@ public class JniCatalog {
   public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
       TException {
     long start = System.currentTimeMillis();
-    TCatalogObject objectDescription = new TCatalogObject();
-    JniUtil.deserializeThrift(protocolFactory_, objectDescription, thriftParams);
+    TCatalogObject objectDesc = new TCatalogObject();
+    JniUtil.deserializeThrift(protocolFactory_, objectDesc, thriftParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(catalog_.getTCatalogObject(objectDescription));
-    JniUtil.logResponse(res.length, start, objectDescription, "getCatalogObject");
-    return res;
+    String shortDesc = "getting thrift catalog object of "
+        + Catalog.toCatalogObjectKey(objectDesc);
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
+      byte[] res = serializer.serialize(catalog_.getTCatalogObject(objectDesc));
+      JniUtil.logResponse(res.length, start, objectDesc, "getCatalogObject");
+      return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in {}. Time spent: {}.", shortDesc,
+          PrintUtils.printTimeMs(duration));
+      throw e;
+    }
   }
 
   /**
@@ -343,12 +391,21 @@ public class JniCatalog {
   public String getJsonCatalogObject(byte[] thriftParams) throws ImpalaException,
       TException {
     long start = System.currentTimeMillis();
-    TCatalogObject objectDescription = new TCatalogObject();
-    JniUtil.deserializeThrift(protocolFactory_, objectDescription, thriftParams);
+    TCatalogObject objectDesc = new TCatalogObject();
+    JniUtil.deserializeThrift(protocolFactory_, objectDesc, thriftParams);
     TSerializer jsonSerializer = new TSerializer(new TSimpleJSONProtocol.Factory());
-    String res = jsonSerializer.toString(catalog_.getTCatalogObject(objectDescription));
-    JniUtil.logResponse(res.length(), start, objectDescription, "getJsonCatalogObject");
-    return res;
+    String shortDesc = "getting json catalog object of "
+        + Catalog.toCatalogObjectKey(objectDesc);
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
+      String res = jsonSerializer.toString(catalog_.getTCatalogObject(objectDesc));
+      JniUtil.logResponse(res.length(), start, objectDesc, "getJsonCatalogObject");
+      return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in {}. Time spent: {}.", shortDesc,
+          PrintUtils.printTimeMs(duration));
+      throw e;
+    }
   }
 
   public byte[] getPartialCatalogObject(byte[] thriftParams) throws ImpalaException,
@@ -358,9 +415,16 @@ public class JniCatalog {
         new TGetPartialCatalogObjectRequest();
     JniUtil.deserializeThrift(protocolFactory_, req, thriftParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(catalog_.getPartialCatalogObject(req));
-    JniUtil.logResponse(res.length, start, req, "getPartialCatalogObject");
-    return res;
+    try {
+      byte[] res = serializer.serialize(catalog_.getPartialCatalogObject(req));
+      JniUtil.logResponse(res.length, start, req, "getPartialCatalogObject");
+      return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in getting PartialCatalogObject of {}. Time spent: {}.",
+          Catalog.toCatalogObjectKey(req.object_desc), PrintUtils.printTimeMs(duration));
+      throw e;
+    }
   }
 
   /**
@@ -405,15 +469,24 @@ public class JniCatalog {
     JniUtil.deserializeThrift(protocolFactory_, request, thriftParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
     TGetPartitionStatsResponse response = new TGetPartitionStatsResponse();
-    try {
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+        "Getting partition stats of " + request.table_name)) {
       response.setPartition_stats(catalog_.getPartitionStats(request));
     } catch (CatalogException e) {
       response.setStatus(
           new TStatus(TErrorCode.INTERNAL_ERROR, ImmutableList.of(e.getMessage())));
     }
-    byte[] res = serializer.serialize(response);
-    JniUtil.logResponse(res.length, start, request, "getPartitionStats");
-    return res;
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+        "Serializing partition stats of " + request.table_name)) {
+      byte[] res = serializer.serialize(response);
+      JniUtil.logResponse(res.length, start, request, "getPartitionStats");
+      return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in serializing partition stats of {}. Time spent in method: {}.",
+          request.table_name, PrintUtils.printTimeMs(duration));
+      throw e;
+    }
   }
 
   /**
@@ -427,10 +500,21 @@ public class JniCatalog {
     JniUtil.deserializeThrift(protocolFactory_, request, thriftUpdateCatalog);
     TSerializer serializer = new TSerializer(protocolFactory_);
     catalogOperationUsage.increment(request);
-    try {
+    String shortDesc = String.format("updateCatalog for %s.%s",
+        request.db_name, request.target_table);
+    LOG.info(shortDesc);
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
       byte[] res = serializer.serialize(catalogOpExecutor_.updateCatalog(request));
       JniUtil.logResponse(res.length, start, request, "updateCatalog");
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("finished {}. Time spent: {}", shortDesc,
+          PrintUtils.printTimeMs(duration));
       return res;
+    } catch (Throwable e) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.error("Error in {}. Time spent: {}.", shortDesc,
+          PrintUtils.printTimeMs(duration));
+      throw e;
     } finally {
       catalogOperationUsage.decrement(request);
     }
diff --git a/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java
new file mode 100644
index 000000000..0b96a3ecb
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.apache.impala.analysis.ColumnName;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.thrift.TCommentOnParams;
+import org.apache.impala.thrift.TDdlExecRequest;
+import org.apache.impala.thrift.TResetMetadataRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogOpUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogOpUtil.class);
+
+  /**
+   * Get a short description for the execDdl request.
+   */
+  public static String getShortDescForExecDdl(TDdlExecRequest req) {
+    String target;
+    try {
+      switch (req.ddl_type) {
+        case ALTER_DATABASE:
+          target = req.getAlter_db_params().getDb();
+          break;
+        case ALTER_TABLE:
+          target = TableName.thriftToString(req.getAlter_table_params().getTable_name());
+          break;
+        case ALTER_VIEW:
+          target = TableName.thriftToString(req.getAlter_view_params().getView_name());
+          break;
+        case CREATE_DATABASE:
+          target = req.getCreate_db_params().getDb();
+          break;
+        case CREATE_TABLE_AS_SELECT:
+        case CREATE_TABLE:
+          target = TableName.thriftToString(req.getCreate_table_params().getTable_name());
+          break;
+        case CREATE_TABLE_LIKE:
+          target = TableName.thriftToString(
+              req.getCreate_table_like_params().getTable_name());
+          break;
+        case CREATE_VIEW:
+          target = TableName.thriftToString(req.getCreate_view_params().getView_name());
+          break;
+        case CREATE_FUNCTION:
+          target = FunctionName.thriftToString(
+              req.getCreate_fn_params().getFn().getName());
+          break;
+        case COMMENT_ON: {
+          TCommentOnParams params = req.getComment_on_params();
+          if (params.isSetDb()) {
+            target = "DB " + params.getDb();
+          } else if (params.isSetTable_name()) {
+            target = "TABLE " + TableName.thriftToString(params.getTable_name());
+          } else if (params.isSetColumn_name()) {
+            target = "COLUMN " + ColumnName.thriftToString(params.getColumn_name());
+          } else {
+            target = "";
+          }
+          break;
+        }
+        case DROP_STATS:
+          target = TableName.thriftToString(req.getDrop_stats_params().getTable_name());
+          break;
+        case DROP_DATABASE:
+          target = req.getDrop_db_params().getDb();
+          break;
+        case DROP_TABLE:
+        case DROP_VIEW:
+          target = TableName.thriftToString(
+              req.getDrop_table_or_view_params().getTable_name());
+          break;
+        case TRUNCATE_TABLE:
+          target = TableName.thriftToString(req.getTruncate_params().getTable_name());
+          break;
+        case DROP_FUNCTION:
+          target = FunctionName.thriftToString(req.getDrop_fn_params().fn_name);
+          break;
+        case CREATE_ROLE:
+        case DROP_ROLE:
+          target = req.getCreate_drop_role_params().getRole_name();
+          break;
+        case GRANT_ROLE:
+        case REVOKE_ROLE:
+          target = req.getGrant_revoke_role_params().getRole_names() + " GROUP " +
+              req.getGrant_revoke_role_params().getGroup_names();
+          break;
+        case GRANT_PRIVILEGE:
+          target = "TO " + req.getGrant_revoke_priv_params().getPrincipal_name();
+          break;
+        case REVOKE_PRIVILEGE:
+          target = "FROM " + req.getGrant_revoke_priv_params().getPrincipal_name();
+          break;
+        default:
+          target = "";
+      }
+    } catch (Throwable t) {
+      // This method is used for all DDL RPCs. We should not fail them by errors happen
+      // here. Catch all exceptions and just log the error.
+      target = "unknown target";
+      LOG.error("Failed to get the target for request", t);
+    }
+
+    String user = "unknown user";
+    if (req.isSetHeader() && req.header.isSetRequesting_user()) {
+      user = req.header.requesting_user;
+    }
+    return String.format("%s%s issued by %s", req.ddl_type, " " + target, user);
+  }
+
+  /**
+   * Get a short description for the resetMetadata request.
+   */
+  public static String getShortDescForReset(TResetMetadataRequest req) {
+    String cmd = req.is_refresh ? "REFRESH " : "INVALIDATE ";
+    if (req.isSetDb_name()) {
+      if (req.is_refresh) cmd += "FUNCTIONS IN ";
+      cmd += "DATABASE " + req.getDb_name();
+    } else if (req.isSetTable_name()) {
+      cmd += "TABLE " + TableName.fromThrift(req.getTable_name());
+      if (req.isSetPartition_spec()) cmd += " PARTITIONS";
+    } else if (req.isAuthorization()) {
+      cmd += "AUTHORIZATION";
+    } else {
+      // Global INVALIDATE METADATA
+      cmd += "ALL";
+    }
+    String user = req.header != null ? req.header.requesting_user : "unknown user";
+    return String.format("%s issued by %s", cmd, user);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java
new file mode 100644
index 000000000..11d8a9f62
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.apache.impala.analysis.GrantRevokePrivStmt;
+import org.apache.impala.analysis.GrantRevokeRoleStmt;
+import org.apache.impala.analysis.PrivilegeSpec;
+import org.apache.impala.analysis.ResetMetadataStmt;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.authorization.User;
+import org.apache.impala.thrift.TAlterDbParams;
+import org.apache.impala.thrift.TAlterDbType;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TColumnName;
+import org.apache.impala.thrift.TCommentOnParams;
+import org.apache.impala.thrift.TCreateDbParams;
+import org.apache.impala.thrift.TCreateDropRoleParams;
+import org.apache.impala.thrift.TCreateFunctionParams;
+import org.apache.impala.thrift.TCreateOrAlterViewParams;
+import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.TDdlExecRequest;
+import org.apache.impala.thrift.TDdlQueryOptions;
+import org.apache.impala.thrift.TDdlType;
+import org.apache.impala.thrift.TDropDbParams;
+import org.apache.impala.thrift.TDropFunctionParams;
+import org.apache.impala.thrift.TDropTableOrViewParams;
+import org.apache.impala.thrift.TFunction;
+import org.apache.impala.thrift.TFunctionName;
+import org.apache.impala.thrift.TGrantRevokePrivParams;
+import org.apache.impala.thrift.TGrantRevokeRoleParams;
+import org.apache.impala.thrift.TPrincipalType;
+import org.apache.impala.thrift.TPrivilegeLevel;
+import org.apache.impala.thrift.TResetMetadataRequest;
+import org.apache.impala.thrift.TTableName;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class CatalogOpUtilTest {
+
+  private void testResetStmt(ResetMetadataStmt stmt, User user, String expected)
+      throws Exception {
+    stmt.setRequestingUser(user);
+    assertEquals(expected, CatalogOpUtil.getShortDescForReset(stmt.toThrift()));
+  }
+
+  @Test
+  public void testResetMetadataDesc() throws Exception {
+    User user = new User("Alice");
+    TableName tblName = new TableName("default", "tbl");
+
+    testResetStmt(ResetMetadataStmt.createInvalidateStmt(),
+        user, "INVALIDATE ALL issued by Alice");
+    testResetStmt(ResetMetadataStmt.createInvalidateStmt(tblName),
+        user, "INVALIDATE TABLE default.tbl issued by Alice");
+    testResetStmt(ResetMetadataStmt.createRefreshTableStmt(tblName),
+        user, "REFRESH TABLE default.tbl issued by Alice");
+    testResetStmt(ResetMetadataStmt.createRefreshFunctionsStmt("db1"),
+        user, "REFRESH FUNCTIONS IN DATABASE db1 issued by Alice");
+    testResetStmt(ResetMetadataStmt.createRefreshAuthorizationStmt(),
+        user, "REFRESH AUTHORIZATION issued by Alice");
+
+    // Test REFRESH PARTITIONS using a fake partition spec
+    ResetMetadataStmt stmt = ResetMetadataStmt.createRefreshTableStmt(tblName);
+    stmt.setRequestingUser(user);
+    TResetMetadataRequest req = stmt.toThrift();
+    req.setPartition_spec(Collections.emptyList());
+    assertEquals("REFRESH TABLE default.tbl PARTITIONS issued by Alice",
+        CatalogOpUtil.getShortDescForReset(req));
+  }
+
+  @Test
+  public void testDdlDesc() {
+    TDdlExecRequest req;
+    TTableName tblName = new TTableName("db1", "tbl1");
+
+    req = new TDdlExecRequest();
+    req.setQuery_options(new TDdlQueryOptions());
+    req.setDdl_type(TDdlType.CREATE_DATABASE);
+    TCreateDbParams createDbParams = new TCreateDbParams();
+    createDbParams.setDb("db1");
+    req.setCreate_db_params(createDbParams);
+    assertEquals("CREATE_DATABASE db1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.DROP_DATABASE);
+    TDropDbParams dropDbParams = new TDropDbParams();
+    dropDbParams.setDb("db1");
+    req.setDrop_db_params(dropDbParams);
+    assertEquals("DROP_DATABASE db1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.ALTER_DATABASE);
+    TAlterDbParams alterDbParams = new TAlterDbParams();
+    alterDbParams.setAlter_type(TAlterDbType.SET_OWNER);
+    alterDbParams.setDb("db1");
+    req.setAlter_db_params(alterDbParams);
+    assertEquals("ALTER_DATABASE db1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.CREATE_TABLE);
+    TCreateTableParams createTableParams = new TCreateTableParams();
+    createTableParams.setTable_name(tblName);
+    req.setCreate_table_params(createTableParams);
+    assertEquals("CREATE_TABLE db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    req.setDdl_type(TDdlType.CREATE_TABLE_AS_SELECT);
+    assertEquals("CREATE_TABLE_AS_SELECT db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.ALTER_TABLE);
+    TAlterTableParams alterTableParams = new TAlterTableParams();
+    alterTableParams.setTable_name(tblName);
+    req.setAlter_table_params(alterTableParams);
+    assertEquals("ALTER_TABLE db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.CREATE_VIEW);
+    TCreateOrAlterViewParams alterViewParams = new TCreateOrAlterViewParams();
+    alterViewParams.setView_name(tblName);
+    req.setCreate_view_params(alterViewParams);
+    assertEquals("CREATE_VIEW db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    req.setDdl_type(TDdlType.ALTER_VIEW);
+    req.setAlter_view_params(alterViewParams);
+    assertEquals("ALTER_VIEW db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.DROP_TABLE);
+    TDropTableOrViewParams dropTableOrViewParams = new TDropTableOrViewParams();
+    dropTableOrViewParams.setTable_name(tblName);
+    req.setDrop_table_or_view_params(dropTableOrViewParams);
+    assertEquals("DROP_TABLE db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    req.setDdl_type(TDdlType.DROP_VIEW);
+    assertEquals("DROP_VIEW db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.COMMENT_ON);
+    TCommentOnParams commentOnParams = new TCommentOnParams();
+    commentOnParams.setDb("db1");
+    req.setComment_on_params(commentOnParams);
+    assertEquals("COMMENT_ON DB db1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    commentOnParams.clear();
+    commentOnParams.setTable_name(tblName);
+    req.setComment_on_params(commentOnParams);
+    assertEquals("COMMENT_ON TABLE db1.tbl1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    commentOnParams.clear();
+    commentOnParams.setColumn_name(new TColumnName(tblName, "col1"));
+    req.setComment_on_params(commentOnParams);
+    assertEquals("COMMENT_ON COLUMN db1.tbl1.col1 issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.CREATE_FUNCTION);
+    TCreateFunctionParams createFunctionParams = new TCreateFunctionParams();
+    TFunction fn = new TFunction();
+    fn.setName(new TFunctionName("my_func"));
+    createFunctionParams.setFn(fn);
+    req.setCreate_fn_params(createFunctionParams);
+    assertEquals("CREATE_FUNCTION my_func issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.DROP_FUNCTION);
+    TDropFunctionParams dropFunctionParams = new TDropFunctionParams();
+    dropFunctionParams.setFn_name(new TFunctionName("my_func"));
+    req.setDrop_fn_params(dropFunctionParams);
+    assertEquals("DROP_FUNCTION my_func issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.CREATE_ROLE);
+    TCreateDropRoleParams createDropRoleParams = new TCreateDropRoleParams();
+    createDropRoleParams.setRole_name("my_role");
+    req.setCreate_drop_role_params(createDropRoleParams);
+    assertEquals("CREATE_ROLE my_role issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    req.setDdl_type(TDdlType.DROP_ROLE);
+    assertEquals("DROP_ROLE my_role issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.GRANT_ROLE);
+    TGrantRevokeRoleParams grantRevokeRoleParams;
+    grantRevokeRoleParams = new GrantRevokeRoleStmt("my_role", "my_group", true)
+        .toThrift();
+    req.setGrant_revoke_role_params(grantRevokeRoleParams);
+    assertEquals("GRANT_ROLE [my_role] GROUP [my_group] issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    req.setDdl_type(TDdlType.REVOKE_ROLE);
+    assertEquals("REVOKE_ROLE [my_role] GROUP [my_group] issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+
+    req.setDdl_type(TDdlType.GRANT_PRIVILEGE);
+    TGrantRevokePrivParams grantRevokePrivParams = new GrantRevokePrivStmt("my_role",
+        PrivilegeSpec.createServerScopedPriv(TPrivilegeLevel.SELECT), true, false,
+        TPrincipalType.ROLE).toThrift();
+    req.setGrant_revoke_priv_params(grantRevokePrivParams);
+    assertEquals("GRANT_PRIVILEGE TO my_role issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+    req.setDdl_type(TDdlType.REVOKE_PRIVILEGE);
+    assertEquals("REVOKE_PRIVILEGE FROM my_role issued by unknown user",
+        CatalogOpUtil.getShortDescForExecDdl(req));
+  }
+}


[impala] 02/06: IMPALA-10865: Fix initialize SelectStmt's groupingExprs_ in analyzeGroupingExprs

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5c3c91eaf9ec0945eec8cb3a1c9b8ac5b5dc635
Author: guojingfeng <gu...@tencent.com>
AuthorDate: Tue Aug 17 11:43:06 2021 +0800

    IMPALA-10865: Fix initialize SelectStmt's groupingExprs_ in analyzeGroupingExprs
    
    This patch rollback some changes of IMPALA-9620. IMPALA-9620 re-
    initialize SelectStmt's groupingExprs_ to ensure that group-by and
    cnf exprs are analyzed. But the following patch of IMPALA-9693
    explicitly analyzes exprs which is equivalent to IMPALA-9620. So this
    rollback is safe here.
    
    In general, the analyze algorithm is that:
    1. Analyze the stmt tree and make copies of expressions
    2. Rewrite selected expressions, **rewrite rules should ensure
       rewritten exprs are analyzed**
    3. Make copied expressions analyzed
    4. ReAnalyze the tree
    
    The problem is that if we change the groupingExprs_ of SelectStmt,
    in re-analyze phase column alias will be substitude to Expr that
    duplicate with origin column which will be removed in
    `buildAggregateExprs`.
    
    Another reason why this patch is submitted is that re-initialize
    SelectStmt's groupingExprs_ will cause other problems. IMPALA-10096 is
    a typical case. See jira for detail execeptions.
    
    Beside, this patch modifies ExtractCompundVerticalBarExprRule to do a
    explicit analyze to ensure expr are rewritten.
    
    Test:
    - Add new test into aggregation.test and passed
    - Ran all fe tests and passed
    
    Change-Id: I9d1779e6c282d9fd02beacf5ddfafcc5c0baf3b0
    Reviewed-on: http://gerrit.cloudera.org:8080/17781
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18913
    Tested-by: Quanlong Huang <hu...@gmail.com>
    Reviewed-by: Tamas Mate <tm...@apache.org>
---
 .../org/apache/impala/analysis/SelectStmt.java     | 15 +++-----------
 .../ExtractCompoundVerticalBarExprRule.java        |  6 ++++--
 .../queries/PlannerTest/aggregation.test           | 23 +++++++++++++++++++++-
 3 files changed, 29 insertions(+), 15 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 226c09e6d..0784f0046 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -461,6 +461,9 @@ public class SelectStmt extends QueryStmt {
           colLabels_.add(label);
         }
       }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Analyzed select clause aliasSmap={}", aliasSmap_.debugString());
+      }
     }
 
     private void verifyResultExprs() throws AnalysisException {
@@ -966,18 +969,6 @@ public class SelectStmt extends QueryStmt {
                   + groupingExprsCopy_.get(i).toSql());
         }
       }
-      // initialize groupingExprs_ with the analyzed version
-      // use the original ordinal if the analyzed expr is a INT literal
-      List<Expr> groupingExprs = new ArrayList<>();
-      for (int i = 0; i < groupingExprsCopy_.size(); ++i) {
-        Expr expr = groupingExprsCopy_.get(i);
-        if (expr instanceof NumericLiteral && Expr.IS_INT_LITERAL.apply(expr)) {
-          groupingExprs.add(groupingExprs_.get(i).clone());
-        } else {
-          groupingExprs.add(expr);
-        }
-      }
-      groupingExprs_ = groupingExprs;
 
       if (groupByClause_ != null && groupByClause_.hasGroupingSets()) {
         groupByClause_.analyzeGroupingSets(groupingExprsCopy_);
diff --git a/fe/src/main/java/org/apache/impala/rewrite/ExtractCompoundVerticalBarExprRule.java b/fe/src/main/java/org/apache/impala/rewrite/ExtractCompoundVerticalBarExprRule.java
index e51797021..8fc27b826 100644
--- a/fe/src/main/java/org/apache/impala/rewrite/ExtractCompoundVerticalBarExprRule.java
+++ b/fe/src/main/java/org/apache/impala/rewrite/ExtractCompoundVerticalBarExprRule.java
@@ -33,10 +33,12 @@ public class ExtractCompoundVerticalBarExprRule implements ExprRewriteRule {
 
   @Override
   public Expr apply(Expr expr, Analyzer analyzer) {
-    if (!expr.isAnalyzed()) return expr;
-
     if (expr instanceof CompoundVerticalBarExpr) {
       CompoundVerticalBarExpr pred = (CompoundVerticalBarExpr) expr;
+      if (!expr.isAnalyzed()) {
+        pred = (CompoundVerticalBarExpr) expr.clone();
+        pred.analyzeNoThrow(analyzer);
+      }
       return pred.getEncapsulatedExpr();
     }
     return expr;
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index 2ac3f53d4..5d10e41a2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -1665,10 +1665,31 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
-|  group by: 2 + 1, id
+|  group by: 3, id
 |  row-size=18B cardinality=10
 |
 00:SCAN HDFS [functional.dimtbl]
    HDFS partitions=1/1 files=1 size=171B
    row-size=8B cardinality=10
 ====
+# IMPALA-10865: Group by expr with column alias reference errors in
+# re-analyze phase.
+SELECT ss_item_sk ss_item_sk_group, ss_item_sk+300 ss_item_sk,
+    count(ss_ticket_number)
+FROM tpcds.store_sales a
+WHERE ss_sold_date_sk > cast('245263' AS INT)
+GROUP BY ss_item_sk_group,
+         ss_item_sk
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(ss_ticket_number)
+|  group by: ss_item_sk, ss_item_sk + 300
+|  row-size=24B cardinality=2.75M
+|
+00:SCAN HDFS [tpcds.store_sales a]
+   partition predicates: ss_sold_date_sk > 245263
+   HDFS partitions=1823/1824 files=1823 size=195.68MB
+   row-size=16B cardinality=2.75M
+====


[impala] 04/06: IMPALA-11464: Skip listing staging dirs to avoid failures on them

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 39af39f0905652ff524baa1a864f9c17e9c1b196
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Sun Jul 31 15:47:31 2022 +0800

    IMPALA-11464: Skip listing staging dirs to avoid failures on them
    
    Hive or other systems will generate staging/tmp dirs under the
    table/partition folders while loading/inserting data. They are removed
    when the operation is done. File metadata loading in catalogd could fail
    if it's listing files of such dirs. This is found on HDFS where file
    listing is done in batches. Each batch contains a partial list of 1000
    items (configured by "dfs.ls.limit"). If the dir is removed, the next
    listing, e.g. the next hasNext() call on the RemoteIterator, will fail
    with FileNotFoundException. Such error on staging/tmp dirs should not
    fail the metadata loading. However, if it happens on a partition dir,
    the metadata loading should fail to avoid stale metadata.
    
    This patch adds a check before listing the dir. If it's a staging/tmp
    dir, catalogd will just ignore it. Also adds a debug action,
    catalogd_pause_after_hdfs_remote_iterator_creation, to inject
    sleeps after the first partial listing (happens in creating the
    RemoteIterator). So we can reproduce the FileNotFoundException stably.
    
    Tests:
     - Add test on removing a large staging dir (contains 1024 files) during
       REFRESH. Metadata loading fails consistently before this fix.
     - Add test on removing a large partition dir (contains 1024 files)
       during REFRESH. Verify metadata loading fails as expected.
    
    Change-Id: Ic848e6c8563a1e0bf294cd50167dfc40f66a56cb
    Reviewed-on: http://gerrit.cloudera.org:8080/18801
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18915
    Tested-by: Quanlong Huang <hu...@gmail.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 .../org/apache/impala/common/FileSystemUtil.java   |  21 +++-
 .../java/org/apache/impala/util/DebugUtils.java    |   4 +
 tests/metadata/test_recursive_listing.py           | 116 +++++++++++++++++++--
 tests/util/filesystem_base.py                      |   7 ++
 tests/util/hdfs_util.py                            |  12 +++
 5 files changed, 150 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index e5439d6b7..ac2dbaf37 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -724,7 +724,7 @@ public class FileSystemUtil {
           return listFiles(fs, p, true, debugAction);
         }
         DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
-        return new FilterIterator(p, new RecursingIterator<>(fs, p,
+        return new FilterIterator(p, new RecursingIterator<>(fs, p, debugAction,
             FileSystemUtil::listStatusIterator));
       }
       DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
@@ -748,7 +748,7 @@ public class FileSystemUtil {
       if (hasRecursiveListFiles(fs)) {
         baseIterator = fs.listFiles(p, recursive);
       } else {
-        baseIterator = new RecursingIterator<>(fs, p,
+        baseIterator = new RecursingIterator<>(fs, p, debugAction,
             FileSystemUtil::listLocatedStatusIterator);
       }
       return new FilterIterator(p, baseIterator);
@@ -926,17 +926,22 @@ public class FileSystemUtil {
     private final BiFunctionWithException<FileSystem, Path, RemoteIterator<T>>
         newIterFunc_;
     private final FileSystem fs_;
+    private final String debugAction_;
     private final Stack<RemoteIterator<T>> iters_ = new Stack<>();
     private RemoteIterator<T> curIter_;
     private T curFile_;
 
-    private RecursingIterator(FileSystem fs, Path startPath,
+    private RecursingIterator(FileSystem fs, Path startPath, String debugAction,
         BiFunctionWithException<FileSystem, Path, RemoteIterator<T>> newIterFunc)
         throws IOException {
       this.fs_ = Preconditions.checkNotNull(fs);
+      this.debugAction_ = debugAction;
       this.newIterFunc_ = Preconditions.checkNotNull(newIterFunc);
       Preconditions.checkNotNull(startPath);
       curIter_ = newIterFunc.apply(fs, startPath);
+      LOG.trace("listed start path: {}", startPath);
+      DebugUtils.executeDebugAction(debugAction,
+          DebugUtils.REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION);
     }
 
     @Override
@@ -978,6 +983,13 @@ public class FileSystemUtil {
      * @throws IOException if any IO error occurs
      */
     private void handleFileStat(T fileStatus) throws IOException {
+      LOG.trace("handleFileStat: {}", fileStatus.getPath());
+      if (isIgnoredDir(fileStatus.getPath())) {
+        LOG.debug("Ignoring {} since it is either a hidden directory or a temporary "
+            + "staging directory", fileStatus.getPath());
+        curFile_ = null;
+        return;
+      }
       if (fileStatus.isFile()) {
         curFile_ = fileStatus;
         return;
@@ -987,6 +999,9 @@ public class FileSystemUtil {
       iters_.push(curIter_);
       curIter_ = subIter;
       curFile_ = fileStatus;
+      LOG.trace("listed sub dir: {}", fileStatus.getPath());
+      DebugUtils.executeDebugAction(debugAction_,
+          DebugUtils.REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION);
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 3c48940b3..f1587f7a4 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -40,6 +40,10 @@ public class DebugUtils {
   public static final String REFRESH_HDFS_LISTING_DELAY
       = "catalogd_refresh_hdfs_listing_delay";
 
+  // debug action label for introducing pauses after creating HDFS RemoteIterators.
+  public static final String REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION
+      = "catalogd_pause_after_hdfs_remote_iterator_creation";
+
   // debug action label for introducing delay in alter table recover partitions command.
   public static final String RECOVER_PARTITIONS_DELAY = "catalogd_table_recover_delay";
 
diff --git a/tests/metadata/test_recursive_listing.py b/tests/metadata/test_recursive_listing.py
index ab7abe8f0..d2d50f261 100644
--- a/tests/metadata/test_recursive_listing.py
+++ b/tests/metadata/test_recursive_listing.py
@@ -9,9 +9,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from tests.common.impala_test_suite import ImpalaTestSuite
+
+import pytest
+import requests
+import time
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.common.test_dimensions import create_uncompressed_text_dimension
-from tests.common.skip import SkipIfLocal
+from tests.common.skip import (SkipIfLocal, SkipIfS3, SkipIfGCS, SkipIfCOS,
+                               SkipIfADLS)
 from tests.util.filesystem_utils import WAREHOUSE
 
 
@@ -21,6 +28,10 @@ class TestRecursiveListing(ImpalaTestSuite):
   This class tests that files are recursively listed within directories
   and partitions, and that REFRESH picks up changes within them.
   """
+  enable_fs_tracing_url = "http://localhost:25020/set_java_loglevel?" \
+                          "class=org.apache.impala.common.FileSystemUtil&level=trace"
+  reset_log_level_url = "http://localhost:25020/reset_java_loglevel"
+
   @classmethod
   def get_workload(self):
     return 'functional-query'
@@ -44,13 +55,13 @@ class TestRecursiveListing(ImpalaTestSuite):
     result = self.client.execute("select * from {0}".format(table))
     return result.data
 
-  def test_unpartitioned(self, vector, unique_database):
-    self._do_test(vector, unique_database, partitioned=False)
+  def test_unpartitioned(self, unique_database):
+    self._do_test(unique_database, partitioned=False)
 
-  def test_partitioned(self, vector, unique_database):
-    self._do_test(vector, unique_database, partitioned=True)
+  def test_partitioned(self, unique_database):
+    self._do_test(unique_database, partitioned=True)
 
-  def _do_test(self, vector, unique_database, partitioned):
+  def _init_test_table(self, unique_database, partitioned):
     tbl_name = "t"
     fq_tbl_name = unique_database + "." + tbl_name
     tbl_path = '%s/%s.db/%s' % (WAREHOUSE, unique_database, tbl_name)
@@ -70,6 +81,11 @@ class TestRecursiveListing(ImpalaTestSuite):
     else:
       part_path = tbl_path
 
+    return fq_tbl_name, part_path
+
+  def _do_test(self, unique_database, partitioned):
+    fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned)
+
     # Add a file inside a nested directory and refresh.
     self.filesystem_client.make_dir("{0}/dir1".format(part_path[1:]))
     self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path[1:]),
@@ -125,3 +141,89 @@ class TestRecursiveListing(ImpalaTestSuite):
     self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
     assert len(self._show_files(fq_tbl_name)) == 1
     assert len(self._get_rows(fq_tbl_name)) == 1
+
+  @SkipIfS3.variable_listing_times
+  @SkipIfCOS.variable_listing_times
+  @SkipIfGCS.variable_listing_times
+  @SkipIfADLS.eventually_consistent
+  @pytest.mark.execute_serially
+  @pytest.mark.stress
+  def test_large_staging_dirs(self, unique_database):
+    """Regression test for IMPALA-11464:
+    Test REFRESH survives with concurrent add/remove ops on large staging/tmp dirs
+    which contain more than 1000 files. Execute this serially since the sleep intervals
+    might not work with concurrent workloads. Test this only on HDFS since other FS might
+    not have partial listing (configured by dfs.ls.limit)."""
+    fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned=True)
+    staging_dir = "{0}/.hive-staging".format(part_path)
+    # Expected timeline (before the fix of IMPALA-11464):
+    # 0ms:   catalogd list the partition dir and wait 200ms.
+    # 200ms: catalogd get the staging dir and list it partially, then wait 200ms.
+    # 300ms: remove the staging dir.
+    # 400ms: catalogd consume the partial list (1000 items). Start listing the remaining
+    #        items and get FileNotFoundException due to the dir is removed.
+    # After the fix of IMPALA-11464, catalogd won't list the staging dir, so avoids
+    # hitting the exception.
+    self._test_listing_large_dir(fq_tbl_name, large_dir=staging_dir,
+                                 pause_ms_after_remote_iterator_creation=200,
+                                 pause_ms_before_file_cleanup=300,
+                                 refresh_should_fail=False)
+
+  @SkipIfS3.variable_listing_times
+  @SkipIfCOS.variable_listing_times
+  @SkipIfGCS.variable_listing_times
+  @SkipIfADLS.eventually_consistent
+  @pytest.mark.execute_serially
+  @pytest.mark.stress
+  def test_partition_dir_removed_inflight(self, unique_database):
+    """Test REFRESH with concurrent add/remove ops on large partition dirs
+    which contain more than 1000 files. Execute this serially since the sleep
+    intervals might not work with concurrent workloads. Test this only on HDFS
+    since other FS might not have partial listing (configured by dfs.ls.limit)"""
+    fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned=True)
+    # Expected timeline:
+    # 0ms:   catalogd start listing the partition dir. Get 1000 items in the first
+    #        partial listing. Then wait for 300ms.
+    # 200ms: The partition dir is removed.
+    # 300ms: catalogd processed the 1000 items and start listing the remaining items.
+    #        Then get FileNotFoundException since the partition dir disappears.
+    self._test_listing_large_dir(fq_tbl_name, large_dir=part_path + '/',
+                                 pause_ms_after_remote_iterator_creation=300,
+                                 pause_ms_before_file_cleanup=200,
+                                 refresh_should_fail=True)
+
+  def _test_listing_large_dir(self, fq_tbl_name, large_dir,
+                              pause_ms_after_remote_iterator_creation,
+                              pause_ms_before_file_cleanup,
+                              refresh_should_fail):
+    # We need data files more than 1000 (default of dfs.ls.limit) so the initial
+    # file listing can't list all of them.
+    files = [large_dir + '/' + str(i) for i in range(1024)]
+    refresh_stmt = "refresh " + fq_tbl_name
+    self.client.set_configuration({
+      "debug_action": "catalogd_pause_after_hdfs_remote_iterator_creation:SLEEP@"
+                      + str(pause_ms_after_remote_iterator_creation)
+    })
+    # Enable TRACE logging in FileSystemUtil for better debugging
+    response = requests.get(self.enable_fs_tracing_url)
+    assert response.status_code == requests.codes.ok
+    try:
+      # self.filesystem_client is a DelegatingHdfsClient. It delegates delete_file_dir()
+      # and make_dir() to the underlying PyWebHdfsClient which expects the HDFS path
+      # without a leading '/'. So we use large_dir[1:] to remove the leading '/'.
+      self.filesystem_client.delete_file_dir(large_dir[1:], recursive=True)
+      self.filesystem_client.make_dir(large_dir[1:])
+      self.filesystem_client.touch(files)
+      LOG.info("created staging files under " + large_dir)
+      handle = self.execute_query_async(refresh_stmt)
+      # Wait a moment to let REFRESH finish expected partial listing on the dir.
+      time.sleep(pause_ms_before_file_cleanup / 1000.0)
+      self.filesystem_client.delete_file_dir(large_dir[1:], recursive=True)
+      LOG.info("removed staging dir " + large_dir)
+      try:
+        self.client.fetch(refresh_stmt, handle)
+        assert not refresh_should_fail, "REFRESH should fail"
+      except ImpalaBeeswaxException as e:
+        assert refresh_should_fail, "unexpected exception " + str(e)
+    finally:
+      requests.get(self.reset_log_level_url)
diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py
index f74453016..037eefccf 100644
--- a/tests/util/filesystem_base.py
+++ b/tests/util/filesystem_base.py
@@ -69,3 +69,10 @@ class BaseFilesystem(object):
     """Returns a list of integers which are all the file sizes of files found under
     'path'."""
     pass
+
+  @abstractmethod
+  def touch(self, paths):
+    """Updates the access and modification times of the files specified by 'paths' to
+    the current time. If the files don't exist, zero length files will be created with
+    current time as the timestamp of them."""
+    pass
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index 81d07e650..50004d663 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -108,6 +108,8 @@ class DelegatingHdfsClient(BaseFilesystem):
   def getacl(self, path):
     return self.webhdfs_client.getacl(path)
 
+  def touch(self, paths):
+    return self.hdfs_filesystem_client.touch(paths)
 
 class PyWebHdfsClientWithChmod(PyWebHdfsClient):
   def chmod(self, path, permission):
@@ -309,6 +311,16 @@ class HadoopFsCommandLineClient(BaseFilesystem):
     missing."""
     return path if path.startswith('/') else '/' + path
 
+  def touch(self, paths):
+    """Updates the access and modification times of the files specified by 'paths' to
+    the current time. If the files don't exist, zero length files will be created with
+    current time as the timestamp of them."""
+    if isinstance(paths, list):
+      cmd = ['-touch'] + paths
+    else:
+      cmd = ['-touch', paths]
+    (status, stdout, stderr) = self._hadoop_fs_shell(cmd)
+    return status == 0
 
 def get_webhdfs_client_from_conf(conf):
   """Returns a new HTTP client for an HDFS cluster using an HdfsConfig object"""


[impala] 01/06: IMPALA-11034: Resolve schema of old data files in migrated Iceberg tables

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1a0b39442992d8e94aef3cb7c34910dd4c0c0cb3
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Tue Jun 14 14:44:25 2022 +0200

    IMPALA-11034: Resolve schema of old data files in migrated Iceberg tables
    
    When external tables are converted to Iceberg, the data files remain
    intact, thus missing field IDs. Previously, Impala used name based
    column resolution in this case.
    
    Added a feature to traverse through the data files before column
    resolution and assign field IDs the same way as iceberg would, to be
    able to use field ID based column resolutions.
    
    Testing:
    
    Default resolution method was changed to field id for migrated tables,
    existing tests use that from now.
    
    Added new tests to cover edge cases with complex types and schema
    evolution.
    
    Change-Id: I77570bbfc2fcc60c2756812d7210110e8cc11ccc
    Reviewed-on: http://gerrit.cloudera.org:8080/18639
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18912
    Tested-by: Quanlong Huang <hu...@gmail.com>
    Reviewed-by: Tamas Mate <tm...@apache.org>
---
 be/src/exec/orc-metadata-utils.cc                  |  63 ++++-
 be/src/exec/orc-metadata-utils.h                   |   9 +-
 be/src/exec/parquet/parquet-metadata-utils.cc      |  69 ++++-
 be/src/exec/parquet/parquet-metadata-utils.h       |  27 +-
 testdata/data/README                               |  28 +++
 .../iceberg_migrated_alter_test/000000_0           | Bin 0 -> 817 bytes
 .../c9f83a82-60f4-443b-9ca4-359cad16fe12-m0.avro   | Bin 0 -> 3182 bytes
 ...396-1-c9f83a82-60f4-443b-9ca4-359cad16fe12.avro | Bin 0 -> 1986 bytes
 .../metadata/v1.metadata.json                      |  81 ++++++
 .../metadata/v2.metadata.json                      | 105 ++++++++
 .../metadata/version-hint.text                     |   1 +
 .../iceberg_migrated_alter_test_orc/000000_0       | Bin 0 -> 418 bytes
 .../340a3b82-71e3-4f50-b030-aecb5a5ea730-m0.avro   | Bin 0 -> 3182 bytes
 ...038-1-340a3b82-71e3-4f50-b030-aecb5a5ea730.avro | Bin 0 -> 1990 bytes
 .../metadata/v1.metadata.json                      |  81 ++++++
 .../metadata/v2.metadata.json                      | 105 ++++++++
 .../metadata/version-hint.text                     |   1 +
 .../iceberg_migrated_complex_test/000000_0         | Bin 0 -> 3006 bytes
 .../152e384f-2851-44b7-9ada-1bfbec74e9fc-m0.avro   | Bin 0 -> 3218 bytes
 ...148-1-152e384f-2851-44b7-9ada-1bfbec74e9fc.avro | Bin 0 -> 1988 bytes
 .../metadata/v1.metadata.json                      | 255 +++++++++++++++++++
 .../metadata/v2.metadata.json                      | 279 +++++++++++++++++++++
 .../metadata/version-hint.text                     |   1 +
 .../iceberg_migrated_complex_test_orc/000000_0     | Bin 0 -> 1217 bytes
 .../8588fd4b-13c1-4451-80ad-5cf71a959b94-m0.avro   | Bin 0 -> 3232 bytes
 ...504-1-8588fd4b-13c1-4451-80ad-5cf71a959b94.avro | Bin 0 -> 1992 bytes
 .../metadata/v1.metadata.json                      | 255 +++++++++++++++++++
 .../metadata/v2.metadata.json                      | 279 +++++++++++++++++++++
 .../metadata/version-hint.text                     |   1 +
 ...iceberg-migrated-table-field-id-resolution.test | 208 +++++++++++++++
 tests/common/file_utils.py                         |  34 +++
 tests/query_test/test_iceberg.py                   |  13 +
 32 files changed, 1874 insertions(+), 21 deletions(-)

diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc
index b08e5c61f..a8fbf58c6 100644
--- a/be/src/exec/orc-metadata-utils.cc
+++ b/be/src/exec/orc-metadata-utils.cc
@@ -17,6 +17,8 @@
 
 #include "exec/orc-metadata-utils.h"
 
+#include <stack>
+
 #include <boost/algorithm/string.hpp>
 
 #include "util/debug-util.h"
@@ -45,13 +47,12 @@ OrcSchemaResolver::OrcSchemaResolver(const HdfsTableDescriptor& tbl_desc,
     filename_(filename),
     is_table_full_acid_(is_table_acid) {
   DetermineFullAcidSchema();
-  if (tbl_desc_.IsIcebergTable() && root_->getSubtypeCount() > 0) {
-    // Use FIELD_ID-based column resolution for Iceberg tables if possible.
-    const orc::Type* first_child =  root_->getSubtype(0);
-    if (first_child->hasAttributeKey(ICEBERG_FIELD_ID)) {
-      schema_resolution_strategy_ = TSchemaResolutionStrategy::FIELD_ID;
-    } else {
-      schema_resolution_strategy_ = TSchemaResolutionStrategy::NAME;
+  if (tbl_desc_.IsIcebergTable()) {
+    schema_resolution_strategy_ = TSchemaResolutionStrategy::FIELD_ID;
+
+    if (root_->getSubtypeCount() > 0
+        && !root_->getSubtype(0)->hasAttributeKey(ICEBERG_FIELD_ID)) {
+      GenerateFieldIDs();
     }
   }
 }
@@ -303,15 +304,57 @@ const orc::Type* OrcSchemaResolver::FindChildWithFieldId(const orc::Type* node,
   for (int i = 0; i < node->getSubtypeCount(); ++i) {
     const orc::Type* child = node->getSubtype(i);
     DCHECK(child != nullptr);
-    if (!child->hasAttributeKey(ICEBERG_FIELD_ID)) return nullptr;
-    std::string field_id_str = child->getAttributeValue(ICEBERG_FIELD_ID);
-    int64_t child_field_id = GetFieldIdFromStr(field_id_str);
+
+    int child_field_id = 0;
+
+    if (LIKELY(child->hasAttributeKey(ICEBERG_FIELD_ID))) {
+      std::string field_id_str = child->getAttributeValue(ICEBERG_FIELD_ID);
+      child_field_id = GetFieldIdFromStr(field_id_str);
+    } else {
+      child_field_id = GetGeneratedFieldID(child);
+    }
+
     if (child_field_id == -1) return nullptr;
     if (child_field_id == field_id) return child;
   }
   return nullptr;
 }
 
+void OrcSchemaResolver::GenerateFieldIDs() {
+  std::stack<const orc::Type*> nodes;
+
+  nodes.push(root_);
+
+  int fieldID = 1;
+
+  while (!nodes.empty()) {
+    const orc::Type* current = nodes.top();
+    nodes.pop();
+
+    uint64_t size = current->getSubtypeCount();
+
+    for (uint64_t i = 0; i < size; i++) {
+      auto retval = orc_type_to_field_id_.emplace(current->getSubtype(i), fieldID++);
+
+      // Emplace has to be successful, otherwise we visited the same node twice
+      DCHECK(retval.second);
+
+      // Push children in reverse order to the stack so they are processed in the original
+      // order
+      nodes.push(current->getSubtype(size - i - 1));
+    }
+  }
+}
+
+int OrcSchemaResolver::GetGeneratedFieldID(const orc::Type* type) const {
+  auto it = orc_type_to_field_id_.find(type);
+
+  // First column has field ID, this one does not, file is corrupted
+  if (UNLIKELY(it == orc_type_to_field_id_.end())) return -1;
+
+  return it->second;
+}
+
 SchemaPath OrcSchemaResolver::GetCanonicalSchemaPath(const SchemaPath& col_path,
     int current_idx) const {
   DCHECK_LT(current_idx, col_path.size());
diff --git a/be/src/exec/orc-metadata-utils.h b/be/src/exec/orc-metadata-utils.h
index aaab7d46f..d9e3f0612 100644
--- a/be/src/exec/orc-metadata-utils.h
+++ b/be/src/exec/orc-metadata-utils.h
@@ -18,7 +18,6 @@
 #pragma once
 
 #include <orc/OrcFile.hh>
-#include <queue>
 
 #include "runtime/descriptors.h"
 
@@ -112,6 +111,13 @@ class OrcSchemaResolver {
   /// Finds child of 'node' that has Iceberg field id equals to 'field_id'.
   const orc::Type* FindChildWithFieldId(const orc::Type* node, const int field_id) const;
 
+  /// Generates field ids for the columns in the same order as Iceberg. The traversal is
+  /// preorder, but the assigned field IDs are not in that order. When a node is
+  /// processed, its child nodes are assigned an ID, hence the difference.
+  void GenerateFieldIDs();
+
+  inline int GetGeneratedFieldID(const orc::Type* type) const;
+
   SchemaPath GetCanonicalSchemaPath(const SchemaPath& col_path, int last_idx) const;
 
   /// Sets 'is_file_full_acid_' based on the file schema.
@@ -122,6 +128,7 @@ class OrcSchemaResolver {
   const char* const filename_ = nullptr;
   const bool is_table_full_acid_;
   bool is_file_full_acid_;
+  std::unordered_map<const orc::Type*, int> orc_type_to_field_id_;
 
   /// Validate whether the ColumnType is compatible with the orc type
   Status ValidateType(const ColumnType& type, const orc::Type& orc_type,
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
index bf905b789..b463f2c35 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -19,6 +19,7 @@
 
 #include <strings.h>
 #include <sstream>
+#include <stack>
 #include <string>
 #include <vector>
 
@@ -810,6 +811,13 @@ SchemaNode* ParquetSchemaResolver::NextSchemaNode(
     *missing_field = true;
     return NULL;
   }
+
+  if (UNLIKELY(file_idx == INVALID_ID)) {
+    VLOG_FILE << Substitute("File '$0' is corrupted", filename_);
+    *missing_field = true;
+    return NULL;
+  }
+
   return &node->children[file_idx];
 }
 
@@ -826,7 +834,17 @@ int ParquetSchemaResolver::FindChildWithFieldId(SchemaNode* node,
     const int& field_id) const {
   int idx;
   for (idx = 0; idx < node->children.size(); ++idx) {
-    if (node->children[idx].element->field_id == field_id) break;
+    SchemaNode* child = &node->children[idx];
+
+    int child_field_id = 0;
+
+    if (LIKELY(child->element->__isset.field_id)) {
+      child_field_id = child->element->field_id;
+    } else {
+      child_field_id = GetGeneratedFieldID(child);
+    }
+    if (child_field_id == field_id) return idx;
+    if (UNLIKELY(child_field_id == INVALID_ID)) return INVALID_ID;
   }
   return idx;
 }
@@ -950,4 +968,53 @@ Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
   return Status::OK();
 }
 
+void ParquetSchemaResolver::GenerateFieldIDs() {
+  std::stack<SchemaNode*> nodes;
+
+  nodes.push(&schema_);
+
+  int fieldID = 1;
+
+  while (!nodes.empty()) {
+    SchemaNode* current = nodes.top();
+    nodes.pop();
+
+    uint64_t size = current->children.size();
+
+    for (uint64_t i = 0; i < size; i++) {
+      auto retval = schema_node_to_field_id_.emplace(&current->children[i], fieldID++);
+
+      // Emplace has to be successful, otherwise we visited the same node twice
+      DCHECK(retval.second);
+
+      // Push children in reverse order to the stack so they are processed in the original
+      // order
+      const uint64_t reverse_idx = size - i - 1;
+
+      SchemaNode& current_child = current->children[reverse_idx];
+
+      const parquet::ConvertedType::type child_type =
+          current_child.element->converted_type;
+
+      if (child_type == parquet::ConvertedType::type::LIST
+          || child_type == parquet::ConvertedType::type::MAP) {
+        // Skip middle level
+        DCHECK(current_child.children.size() == 1);
+
+        nodes.push(&current_child.children[0]);
+      } else {
+        nodes.push(&current_child);
+      }
+    }
+  }
+}
+
+int ParquetSchemaResolver::GetGeneratedFieldID(SchemaNode* node) const {
+  auto it = schema_node_to_field_id_.find(node);
+
+  // First column has field ID, this one does not, file is corrupted
+  if (UNLIKELY(it == schema_node_to_field_id_.end())) return INVALID_ID;
+
+  return it->second;
+}
 }
diff --git a/be/src/exec/parquet/parquet-metadata-utils.h b/be/src/exec/parquet/parquet-metadata-utils.h
index df9656d16..1043bca86 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.h
+++ b/be/src/exec/parquet/parquet-metadata-utils.h
@@ -155,17 +155,14 @@ class ParquetSchemaResolver {
     filename_ = filename;
     // Use FIELD_ID-based column resolution for Iceberg tables if possible.
     const auto& schema = file_metadata->schema;
-    if (tbl_desc_.IsIcebergTable() && schema.size() > 1) {
+    Status status = CreateSchemaTree(file_metadata->schema, &schema_);
+    if (tbl_desc_.IsIcebergTable()) {
+      fallback_schema_resolution_ = TSchemaResolutionStrategy::type::FIELD_ID;
+
       // schema[0] is the 'root', schema[1] is the first column.
-      const parquet::SchemaElement& first_column = schema[1];
-      if (first_column.__isset.field_id) {
-        fallback_schema_resolution_ = TSchemaResolutionStrategy::type::FIELD_ID;
-      } else {
-        // Use Name-based schema resolution in case of missing field ids.
-        fallback_schema_resolution_ = TSchemaResolutionStrategy::type::NAME;
-      }
+      if (schema.size() > 1 && !schema[1].__isset.field_id) GenerateFieldIDs();
     }
-    return CreateSchemaTree(file_metadata->schema, &schema_);
+    return status;
   }
 
   /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path'
@@ -195,6 +192,9 @@ class ParquetSchemaResolver {
   /// Used to sanity-check Parquet schemas.
   static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024;
 
+  /// Invalid ID used to signal corrupted file
+  static const int INVALID_ID = -1;
+
   /// Maps from the array-resolution policy to the ordered array encodings that should
   /// be tried during path resolution. All entries have the ONE_LEVEL encoding at the end
   /// because there is no ambiguity between the one-level and the other encodings (there
@@ -229,6 +229,7 @@ class ParquetSchemaResolver {
   /// then the index of the first match is returned.
   int FindChildWithName(SchemaNode* node, const std::string& name) const;
   /// Returns the index of 'node's child with 'field id' for Iceberg tables.
+  /// Return -1 if the file is corrupted
   int FindChildWithFieldId(SchemaNode* node, const int& field_id) const;
 
   /// The ResolvePathHelper() logic for arrays.
@@ -244,10 +245,18 @@ class ParquetSchemaResolver {
   Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
       const SchemaPath& path, int idx) const;
 
+  /// Generates field ids for the columns in the same order as Iceberg. The traversal is
+  /// preorder, but the assigned field IDs are not. When a node is visited, its child
+  /// nodes are assigned an ID, hence the difference.
+  void GenerateFieldIDs();
+
+  inline int GetGeneratedFieldID(SchemaNode* node) const;
+
   const HdfsTableDescriptor& tbl_desc_;
   TSchemaResolutionStrategy::type fallback_schema_resolution_;
   const TParquetArrayResolution::type array_resolution_;
   const char* filename_;
+  std::unordered_map<SchemaNode*, int32_t> schema_node_to_field_id_;
 
   /// Root node of our internal schema representation populated in Init().
   SchemaNode schema_;
diff --git a/testdata/data/README b/testdata/data/README
index 568152329..13ff1162c 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -735,3 +735,31 @@ Generated by Spark 3.2 + Iceberg 0.13. Then the JSON and AVRO files were manuall
 to make these tables correspond to an Iceberg table in a HadoopCatalog instead of
 HiveCatalog.
 The table has a positional delete file.
+
+iceberg_test/iceberg_migrated_alter_test
+Generated and migrated by Hive
+CREATE TABLE iceberg_migrated_alter_test (int_col int, string_col string, double_col double) stored as parquet;
+insert into table iceberg_migrated_alter_test values (0, "A", 0.5), (1, "B", 1.5), (2, "C", 2.5);
+ALTER TABLE iceberg_migrated_alter_test SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
+Then extracted from hdfs and modified to be able to load as an external hadoop table
+
+iceberg_test/iceberg_migrated_alter_test_orc
+Generated and migrated by Hive
+CREATE TABLE iceberg_migrated_alter_test_orc (int_col int, string_col string, double_col double) stored as orc;
+insert into table iceberg_migrated_alter_test_orc values (0, "A", 0.5), (1, "B", 1.5), (2, "C", 2.5);
+ALTER TABLE iceberg_migrated_alter_test_orc SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
+Then extracted from hdfs and modified to be able to load as an external hadoop table
+
+iceberg_test/iceberg_migrated_complex_test
+Generated and migrated by Hive
+CREATE TABLE iceberg_migrated_complex_test (struct_1_col struct<int_array_col: array<int>, string_col: string, bool_int_map_col: map<boolean, int>>,  int_bigint_map_col map<int, bigint>, struct_2_col struct<struct_3_col: struct<float_col: float, string_double_map_col: map<string, double>, bigint_array_col: array<bigint>>, int_int_map_col: map<int, int>>) stored as parquet;
+insert into table iceberg_migrated_complex_test values (named_struct("int_array_col", array(0), "string_col", "A", "bool_int_map_col", map(True, 1 )), map(2,CAST(3 as bigint)), named_struct("struct_3_col", named_struct("float_col", cast(0.5 as float), "string_double_map_col", map("B", cast(1.5 as double)), "bigint_array_col", array(cast(4 as bigint))), "int_int_map_col", map(5,6)));
+ALTER TABLE iceberg_migrated_complex_test SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
+Then extracted from hdfs and modified to be able to load as an external hadoop table
+
+iceberg_test/iceberg_migrated_complex_test_orc
+Generated and migrated by Hive
+CREATE TABLE iceberg_migrated_complex_test_orc (struct_1_col struct<int_array_col: array<int>, string_col: string, bool_int_map_col: map<boolean, int>>,  int_bigint_map_col map<int, bigint>, struct_2_col struct<struct_3_col: struct<float_col: float, string_double_map_col: map<string, double>, bigint_array_col: array<bigint>>, int_int_map_col: map<int, int>>) stored as orc;
+insert into table iceberg_migrated_complex_test_orc values (named_struct("int_array_col", array(0), "string_col", "A", "bool_int_map_col", map(True, 1 )), map(2,CAST(3 as bigint)), named_struct("struct_3_col", named_struct("float_col", cast(0.5 as float), "string_double_map_col", map("B", cast(1.5 as double)), "bigint_array_col", array(cast(4 as bigint))), "int_int_map_col", map(5,6)));
+ALTER TABLE iceberg_migrated_complex_test_orc SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
+Then extracted from hdfs and modified to be able to load as an external hadoop table
\ No newline at end of file
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test/000000_0 b/testdata/data/iceberg_test/iceberg_migrated_alter_test/000000_0
new file mode 100644
index 000000000..9edbb251b
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_alter_test/000000_0 differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/c9f83a82-60f4-443b-9ca4-359cad16fe12-m0.avro b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/c9f83a82-60f4-443b-9ca4-359cad16fe12-m0.avro
new file mode 100644
index 000000000..4d0699eb1
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/c9f83a82-60f4-443b-9ca4-359cad16fe12-m0.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/snap-2941076094076108396-1-c9f83a82-60f4-443b-9ca4-359cad16fe12.avro b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/snap-2941076094076108396-1-c9f83a82-60f4-443b-9ca4-359cad16fe12.avro
new file mode 100644
index 000000000..065af0120
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/snap-2941076094076108396-1-c9f83a82-60f4-443b-9ca4-359cad16fe12.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/v1.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/v1.metadata.json
new file mode 100644
index 000000000..976f04593
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/v1.metadata.json
@@ -0,0 +1,81 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "7068d1a4-ba6e-4f14-ba63-4014606f91fe",
+  "location" : "/test-warehouse/iceberg_migrated_alter_test",
+  "last-updated-ms" : 1656338172830,
+  "last-column-id" : 3,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656338172",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"int_col\" ]\n}, {\n  \"field-id\" : 2,\n  \"names\" : [ \"string_col\" ]\n}, {\n  \"field-id\" : 3,\n  \"names\" : [ \"double_col\" ]\n} ]",
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "817",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "parquet",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : -1,
+  "snapshots" : [ ],
+  "snapshot-log" : [ ],
+  "metadata-log" : [ ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/v2.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/v2.metadata.json
new file mode 100644
index 000000000..1eb3292e3
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/v2.metadata.json
@@ -0,0 +1,105 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "7068d1a4-ba6e-4f14-ba63-4014606f91fe",
+  "location" : "/test-warehouse/iceberg_migrated_alter_test",
+  "last-updated-ms" : 1656338172947,
+  "last-column-id" : 3,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656338172",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"int_col\" ]\n}, {\n  \"field-id\" : 2,\n  \"names\" : [ \"string_col\" ]\n}, {\n  \"field-id\" : 3,\n  \"names\" : [ \"double_col\" ]\n} ]",
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "817",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "parquet",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : 2941076094076108396,
+  "snapshots" : [ {
+    "snapshot-id" : 2941076094076108396,
+    "timestamp-ms" : 1656338172947,
+    "summary" : {
+      "operation" : "append",
+      "added-data-files" : "1",
+      "added-records" : "3",
+      "added-files-size" : "817",
+      "changed-partition-count" : "1",
+      "total-records" : "3",
+      "total-files-size" : "817",
+      "total-data-files" : "1",
+      "total-delete-files" : "0",
+      "total-position-deletes" : "0",
+      "total-equality-deletes" : "0"
+    },
+    "manifest-list" : "/test-warehouse/iceberg_migrated_alter_test/metadata/snap-2941076094076108396-1-c9f83a82-60f4-443b-9ca4-359cad16fe12.avro",
+    "schema-id" : 0
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1656338172947,
+    "snapshot-id" : 2941076094076108396
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1656338172830,
+    "metadata-file" : "/test-warehouse/iceberg_migrated_alter_test/metadata/v1.metadata.json"
+  } ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/version-hint.text b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/version-hint.text
new file mode 100644
index 000000000..0cfbf0888
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_alter_test/metadata/version-hint.text
@@ -0,0 +1 @@
+2
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/000000_0 b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/000000_0
new file mode 100644
index 000000000..670731972
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/000000_0 differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/340a3b82-71e3-4f50-b030-aecb5a5ea730-m0.avro b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/340a3b82-71e3-4f50-b030-aecb5a5ea730-m0.avro
new file mode 100644
index 000000000..b598a1652
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/340a3b82-71e3-4f50-b030-aecb5a5ea730-m0.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/snap-2205107170480729038-1-340a3b82-71e3-4f50-b030-aecb5a5ea730.avro b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/snap-2205107170480729038-1-340a3b82-71e3-4f50-b030-aecb5a5ea730.avro
new file mode 100644
index 000000000..84ba7e922
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/snap-2205107170480729038-1-340a3b82-71e3-4f50-b030-aecb5a5ea730.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/v1.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/v1.metadata.json
new file mode 100644
index 000000000..087907c8b
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/v1.metadata.json
@@ -0,0 +1,81 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "c7c2b112-b353-4a15-a194-97230ea9ea8b",
+  "location" : "/test-warehouse/iceberg_migrated_alter_test_orc",
+  "last-updated-ms" : 1656404671678,
+  "last-column-id" : 3,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656404671",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"int_col\" ]\n}, {\n  \"field-id\" : 2,\n  \"names\" : [ \"string_col\" ]\n}, {\n  \"field-id\" : 3,\n  \"names\" : [ \"double_col\" ]\n} ]",
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "418",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "orc",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : -1,
+  "snapshots" : [ ],
+  "snapshot-log" : [ ],
+  "metadata-log" : [ ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/v2.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/v2.metadata.json
new file mode 100644
index 000000000..3800fe04b
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/v2.metadata.json
@@ -0,0 +1,105 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "c7c2b112-b353-4a15-a194-97230ea9ea8b",
+  "location" : "/test-warehouse/iceberg_migrated_alter_test_orc",
+  "last-updated-ms" : 1656404671775,
+  "last-column-id" : 3,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656404671",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"int_col\" ]\n}, {\n  \"field-id\" : 2,\n  \"names\" : [ \"string_col\" ]\n}, {\n  \"field-id\" : 3,\n  \"names\" : [ \"double_col\" ]\n} ]",
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "418",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "orc",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : 2205107170480729038,
+  "snapshots" : [ {
+    "snapshot-id" : 2205107170480729038,
+    "timestamp-ms" : 1656404671775,
+    "summary" : {
+      "operation" : "append",
+      "added-data-files" : "1",
+      "added-records" : "3",
+      "added-files-size" : "418",
+      "changed-partition-count" : "1",
+      "total-records" : "3",
+      "total-files-size" : "418",
+      "total-data-files" : "1",
+      "total-delete-files" : "0",
+      "total-position-deletes" : "0",
+      "total-equality-deletes" : "0"
+    },
+    "manifest-list" : "/test-warehouse/iceberg_migrated_alter_test_orc/metadata/snap-2205107170480729038-1-340a3b82-71e3-4f50-b030-aecb5a5ea730.avro",
+    "schema-id" : 0
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1656404671775,
+    "snapshot-id" : 2205107170480729038
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1656404671678,
+    "metadata-file" : "/test-warehouse/iceberg_migrated_alter_test_orc/metadata/00000-87733f1d-9cc3-4427-9451-09ea25c0f4cd.metadata.json"
+  } ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/version-hint.text b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/version-hint.text
new file mode 100644
index 000000000..0cfbf0888
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_alter_test_orc/metadata/version-hint.text
@@ -0,0 +1 @@
+2
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test/000000_0 b/testdata/data/iceberg_test/iceberg_migrated_complex_test/000000_0
new file mode 100644
index 000000000..8eb82dad1
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_complex_test/000000_0 differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/152e384f-2851-44b7-9ada-1bfbec74e9fc-m0.avro b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/152e384f-2851-44b7-9ada-1bfbec74e9fc-m0.avro
new file mode 100644
index 000000000..15b963fb4
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/152e384f-2851-44b7-9ada-1bfbec74e9fc-m0.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/snap-3911840040574896148-1-152e384f-2851-44b7-9ada-1bfbec74e9fc.avro b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/snap-3911840040574896148-1-152e384f-2851-44b7-9ada-1bfbec74e9fc.avro
new file mode 100644
index 000000000..8fa357984
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/snap-3911840040574896148-1-152e384f-2851-44b7-9ada-1bfbec74e9fc.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v1.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v1.metadata.json
new file mode 100644
index 000000000..881fc1565
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v1.metadata.json
@@ -0,0 +1,255 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "61beb841-ed4d-40f3-8be1-7b994886212f",
+  "location" : "/test-warehouse/iceberg_migrated_complex_test",
+  "last-updated-ms" : 1656501596629,
+  "last-column-id" : 21,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656501596",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"struct_1_col\" ],\n  \"fields\" : [ {\n    \"field-id\" : 2,\n    \"names\" : [ \"int_array_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 3,\n      \"names\" : [ \"element\" ]\n    } ]\n  }, {\n    \"field-id\" : 4,\n    \"names\" : [ \"string_col\" ]\n  }, {\n    \"field-id\" : 5,\n    \"names\" : [ \"bool_int_map_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 6,\n      \"names\" : [ \"key\" ]\ [...]
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "3006",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "parquet",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : -1,
+  "snapshots" : [ ],
+  "snapshot-log" : [ ],
+  "metadata-log" : [ ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v2.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v2.metadata.json
new file mode 100644
index 000000000..20a649b7c
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/v2.metadata.json
@@ -0,0 +1,279 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "61beb841-ed4d-40f3-8be1-7b994886212f",
+  "location" : "/test-warehouse/iceberg_migrated_complex_test",
+  "last-updated-ms" : 1656501596751,
+  "last-column-id" : 21,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656501596",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"struct_1_col\" ],\n  \"fields\" : [ {\n    \"field-id\" : 2,\n    \"names\" : [ \"int_array_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 3,\n      \"names\" : [ \"element\" ]\n    } ]\n  }, {\n    \"field-id\" : 4,\n    \"names\" : [ \"string_col\" ]\n  }, {\n    \"field-id\" : 5,\n    \"names\" : [ \"bool_int_map_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 6,\n      \"names\" : [ \"key\" ]\ [...]
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "3006",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "parquet",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : 3911840040574896148,
+  "snapshots" : [ {
+    "snapshot-id" : 3911840040574896148,
+    "timestamp-ms" : 1656501596751,
+    "summary" : {
+      "operation" : "append",
+      "added-data-files" : "1",
+      "added-records" : "1",
+      "added-files-size" : "3006",
+      "changed-partition-count" : "1",
+      "total-records" : "1",
+      "total-files-size" : "3006",
+      "total-data-files" : "1",
+      "total-delete-files" : "0",
+      "total-position-deletes" : "0",
+      "total-equality-deletes" : "0"
+    },
+    "manifest-list" : "/test-warehouse/iceberg_migrated_complex_test/metadata/snap-3911840040574896148-1-152e384f-2851-44b7-9ada-1bfbec74e9fc.avro",
+    "schema-id" : 0
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1656501596751,
+    "snapshot-id" : 3911840040574896148
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1656501596629,
+    "metadata-file" : "/test-warehouse/iceberg_migrated_complex_test/metadata/00000-050bc482-2885-4c5f-82a5-db316f892552.metadata.json"
+  } ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/version-hint.text b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/version-hint.text
new file mode 100644
index 000000000..0cfbf0888
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_complex_test/metadata/version-hint.text
@@ -0,0 +1 @@
+2
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/000000_0 b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/000000_0
new file mode 100644
index 000000000..30bfddb87
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/000000_0 differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/8588fd4b-13c1-4451-80ad-5cf71a959b94-m0.avro b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/8588fd4b-13c1-4451-80ad-5cf71a959b94-m0.avro
new file mode 100644
index 000000000..e9aa6c0b6
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/8588fd4b-13c1-4451-80ad-5cf71a959b94-m0.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/snap-3622599918649152504-1-8588fd4b-13c1-4451-80ad-5cf71a959b94.avro b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/snap-3622599918649152504-1-8588fd4b-13c1-4451-80ad-5cf71a959b94.avro
new file mode 100644
index 000000000..c5c01ff72
Binary files /dev/null and b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/snap-3622599918649152504-1-8588fd4b-13c1-4451-80ad-5cf71a959b94.avro differ
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v1.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v1.metadata.json
new file mode 100644
index 000000000..7b69388c5
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v1.metadata.json
@@ -0,0 +1,255 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "4025e262-f92e-4adb-800d-c42be2a65940",
+  "location" : "/test-warehouse/iceberg_migrated_complex_test_orc",
+  "last-updated-ms" : 1656496318199,
+  "last-column-id" : 21,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656496318",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"struct_1_col\" ],\n  \"fields\" : [ {\n    \"field-id\" : 2,\n    \"names\" : [ \"int_array_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 3,\n      \"names\" : [ \"element\" ]\n    } ]\n  }, {\n    \"field-id\" : 4,\n    \"names\" : [ \"string_col\" ]\n  }, {\n    \"field-id\" : 5,\n    \"names\" : [ \"bool_int_map_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 6,\n      \"names\" : [ \"key\" ]\ [...]
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "1217",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "orc",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : -1,
+  "snapshots" : [ ],
+  "snapshot-log" : [ ],
+  "metadata-log" : [ ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v2.metadata.json b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v2.metadata.json
new file mode 100644
index 000000000..b4024c224
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/v2.metadata.json
@@ -0,0 +1,279 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "4025e262-f92e-4adb-800d-c42be2a65940",
+  "location" : "/test-warehouse/iceberg_migrated_complex_test_orc",
+  "last-updated-ms" : 1656496318303,
+  "last-column-id" : 21,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "struct_1_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 4,
+          "name" : "int_array_col",
+          "required" : false,
+          "type" : {
+            "type" : "list",
+            "element-id" : 7,
+            "element" : "int",
+            "element-required" : false
+          }
+        }, {
+          "id" : 5,
+          "name" : "string_col",
+          "required" : false,
+          "type" : "string"
+        }, {
+          "id" : 6,
+          "name" : "bool_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 8,
+            "key" : "boolean",
+            "value-id" : 9,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    }, {
+      "id" : 2,
+      "name" : "int_bigint_map_col",
+      "required" : false,
+      "type" : {
+        "type" : "map",
+        "key-id" : 10,
+        "key" : "int",
+        "value-id" : 11,
+        "value" : "long",
+        "value-required" : false
+      }
+    }, {
+      "id" : 3,
+      "name" : "struct_2_col",
+      "required" : false,
+      "type" : {
+        "type" : "struct",
+        "fields" : [ {
+          "id" : 12,
+          "name" : "struct_3_col",
+          "required" : false,
+          "type" : {
+            "type" : "struct",
+            "fields" : [ {
+              "id" : 14,
+              "name" : "float_col",
+              "required" : false,
+              "type" : "float"
+            }, {
+              "id" : 15,
+              "name" : "string_double_map_col",
+              "required" : false,
+              "type" : {
+                "type" : "map",
+                "key-id" : 17,
+                "key" : "string",
+                "value-id" : 18,
+                "value" : "double",
+                "value-required" : false
+              }
+            }, {
+              "id" : 16,
+              "name" : "bigint_array_col",
+              "required" : false,
+              "type" : {
+                "type" : "list",
+                "element-id" : 19,
+                "element" : "long",
+                "element-required" : false
+              }
+            } ]
+          }
+        }, {
+          "id" : 13,
+          "name" : "int_int_map_col",
+          "required" : false,
+          "type" : {
+            "type" : "map",
+            "key-id" : 20,
+            "key" : "int",
+            "value-id" : 21,
+            "value" : "int",
+            "value-required" : false
+          }
+        } ]
+      }
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "last_modified_time" : "1656496318",
+    "gc.enabled" : "TRUE",
+    "bucketing_version" : "2",
+    "schema.name-mapping.default" : "[ {\n  \"field-id\" : 1,\n  \"names\" : [ \"struct_1_col\" ],\n  \"fields\" : [ {\n    \"field-id\" : 2,\n    \"names\" : [ \"int_array_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 3,\n      \"names\" : [ \"element\" ]\n    } ]\n  }, {\n    \"field-id\" : 4,\n    \"names\" : [ \"string_col\" ]\n  }, {\n    \"field-id\" : 5,\n    \"names\" : [ \"bool_int_map_col\" ],\n    \"fields\" : [ {\n      \"field-id\" : 6,\n      \"names\" : [ \"key\" ]\ [...]
+    "last_modified_by" : "gfurnstahl",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "numFilesErasureCoded" : "0",
+    "engine.hive.enabled" : "true",
+    "MIGRATED_TO_ICEBERG" : "true",
+    "totalSize" : "1217",
+    "EXTERNAL" : "TRUE",
+    "write.format.default" : "orc",
+    "numFiles" : "1",
+    "TRANSLATED_TO_EXTERNAL" : "TRUE",
+    "table_type" : "ICEBERG"
+  },
+  "current-snapshot-id" : 3622599918649152504,
+  "snapshots" : [ {
+    "snapshot-id" : 3622599918649152504,
+    "timestamp-ms" : 1656496318303,
+    "summary" : {
+      "operation" : "append",
+      "added-data-files" : "1",
+      "added-records" : "1",
+      "added-files-size" : "1217",
+      "changed-partition-count" : "1",
+      "total-records" : "1",
+      "total-files-size" : "1217",
+      "total-data-files" : "1",
+      "total-delete-files" : "0",
+      "total-position-deletes" : "0",
+      "total-equality-deletes" : "0"
+    },
+    "manifest-list" : "/test-warehouse/iceberg_migrated_complex_test_orc/metadata/snap-3622599918649152504-1-8588fd4b-13c1-4451-80ad-5cf71a959b94.avro",
+    "schema-id" : 0
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1656496318303,
+    "snapshot-id" : 3622599918649152504
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1656496318199,
+    "metadata-file" : "/test-warehouse/iceberg_migrated_complex_test_orc/metadata/00000-9ddb4fd1-dcfa-43a7-8eb5-24835fe1b8b7.metadata.json"
+  } ]
+}
diff --git a/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/version-hint.text b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/version-hint.text
new file mode 100644
index 000000000..0cfbf0888
--- /dev/null
+++ b/testdata/data/iceberg_test/iceberg_migrated_complex_test_orc/metadata/version-hint.text
@@ -0,0 +1 @@
+2
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-table-field-id-resolution.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-table-field-id-resolution.test
new file mode 100644
index 000000000..42cc98936
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-table-field-id-resolution.test
@@ -0,0 +1,208 @@
+====
+---- QUERY
+select * from iceberg_migrated_alter_test;
+---- RESULTS
+0,'A',0.5
+1,'B',1.5
+2,'C',2.5
+---- TYPES
+INT, STRING, DOUBLE
+====
+---- QUERY
+alter table iceberg_migrated_alter_test drop column string_col;
+select * from iceberg_migrated_alter_test;
+---- RESULTS
+0,0.5
+1,1.5
+2,2.5
+---- TYPES
+INT, DOUBLE
+====
+---- QUERY
+alter table iceberg_migrated_alter_test add column string_col string;
+select * from iceberg_migrated_alter_test;
+---- RESULTS
+0,0.5,'NULL'
+1,1.5,'NULL'
+2,2.5,'NULL'
+---- TYPES
+INT, DOUBLE, STRING
+====
+---- QUERY
+alter table iceberg_migrated_alter_test change column double_col renamed_double_col double;
+select * from iceberg_migrated_alter_test;
+---- RESULTS
+0,0.5,'NULL'
+1,1.5,'NULL'
+2,2.5,'NULL'
+---- TYPES
+INT, DOUBLE, STRING
+====
+---- QUERY
+alter table iceberg_migrated_alter_test add column double_col double;
+select * from iceberg_migrated_alter_test;
+---- RESULTS
+0,0.5,'NULL',NULL
+1,1.5,'NULL',NULL
+2,2.5,'NULL',NULL
+---- TYPES
+INT, DOUBLE, STRING, DOUBLE
+====
+---- QUERY
+insert into iceberg_migrated_alter_test values (3,3.5,"D",3.8);
+select * from iceberg_migrated_alter_test;
+---- RESULTS
+3,3.5,'D',3.8
+0,0.5,'NULL',NULL
+1,1.5,'NULL',NULL
+2,2.5,'NULL',NULL
+---- TYPES
+INT, DOUBLE, STRING, DOUBLE
+====
+---- QUERY
+select * from iceberg_migrated_alter_test_orc;
+---- RESULTS
+0,'A',0.5
+1,'B',1.5
+2,'C',2.5
+---- TYPES
+INT, STRING, DOUBLE
+====
+---- QUERY
+alter table iceberg_migrated_alter_test_orc drop column string_col;
+select * from iceberg_migrated_alter_test_orc;
+---- RESULTS
+0,0.5
+1,1.5
+2,2.5
+---- TYPES
+INT, DOUBLE
+====
+---- QUERY
+alter table iceberg_migrated_alter_test_orc add column string_col string;
+select * from iceberg_migrated_alter_test_orc;
+---- RESULTS
+0,0.5,'NULL'
+1,1.5,'NULL'
+2,2.5,'NULL'
+---- TYPES
+INT, DOUBLE, STRING
+====
+---- QUERY
+alter table iceberg_migrated_alter_test_orc change column double_col renamed_double_col double;
+select * from iceberg_migrated_alter_test_orc;
+---- RESULTS
+0,0.5,'NULL'
+1,1.5,'NULL'
+2,2.5,'NULL'
+---- TYPES
+INT, DOUBLE, STRING
+====
+---- QUERY
+alter table iceberg_migrated_alter_test_orc add column double_col double;
+select * from iceberg_migrated_alter_test_orc;
+---- RESULTS
+0,0.5,'NULL',NULL
+1,1.5,'NULL',NULL
+2,2.5,'NULL',NULL
+---- TYPES
+INT, DOUBLE, STRING, DOUBLE
+====
+---- QUERY
+select struct_1_col.string_col, struct_2_col.struct_3_col.float_col from iceberg_migrated_complex_test;
+---- RESULTS
+'A',0.5
+---- TYPES
+STRING, FLOAT
+====
+---- QUERY
+select my_array_1.pos, my_array_1.item from iceberg_migrated_complex_test, iceberg_migrated_complex_test.struct_1_col.int_array_col as my_array_1;
+---- RESULTS
+0,0
+---- TYPES
+BIGINT, INT
+====
+---- QUERY
+select my_array_2.pos, my_array_2.item from iceberg_migrated_complex_test, iceberg_migrated_complex_test.struct_2_col.struct_3_col.bigint_array_col as my_array_2;
+---- RESULTS
+0,4
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select my_map_1.key, my_map_1.value from iceberg_migrated_complex_test, iceberg_migrated_complex_test.struct_1_col.bool_int_map_col as my_map_1;
+---- RESULTS
+true,1
+---- TYPES
+BOOLEAN, INT
+====
+---- QUERY
+select my_map_2.key, my_map_2.value from iceberg_migrated_complex_test, iceberg_migrated_complex_test.int_bigint_map_col as my_map_2;
+---- RESULTS
+2,3
+---- TYPES
+INT, BIGINT
+====
+---- QUERY
+select my_map_3.key, my_map_3.value from iceberg_migrated_complex_test, iceberg_migrated_complex_test.struct_2_col.struct_3_col.string_double_map_col as my_map_3;
+---- RESULTS
+'B',1.5
+---- TYPES
+STRING, DOUBLE
+====
+---- QUERY
+select my_map_4.key, my_map_4.value from iceberg_migrated_complex_test, iceberg_migrated_complex_test.struct_2_col.int_int_map_col as my_map_4;
+---- RESULTS
+5,6
+---- TYPES
+INT, INT
+====
+---- QUERY
+select struct_1_col.string_col, struct_2_col.struct_3_col.float_col from iceberg_migrated_complex_test_orc;
+---- RESULTS
+'A',0.5
+---- TYPES
+STRING, FLOAT
+====
+---- QUERY
+select my_array_1.pos, my_array_1.item from iceberg_migrated_complex_test_orc, iceberg_migrated_complex_test_orc.struct_1_col.int_array_col as my_array_1;
+---- RESULTS
+0,0
+---- TYPES
+BIGINT, INT
+====
+---- QUERY
+select my_array_2.pos, my_array_2.item from iceberg_migrated_complex_test_orc, iceberg_migrated_complex_test_orc.struct_2_col.struct_3_col.bigint_array_col as my_array_2;
+---- RESULTS
+0,4
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select my_map_1.key, my_map_1.value from iceberg_migrated_complex_test_orc, iceberg_migrated_complex_test_orc.struct_1_col.bool_int_map_col as my_map_1;
+---- RESULTS
+true,1
+---- TYPES
+BOOLEAN, INT
+====
+---- QUERY
+select my_map_2.key, my_map_2.value from iceberg_migrated_complex_test_orc, iceberg_migrated_complex_test_orc.int_bigint_map_col as my_map_2;
+---- RESULTS
+2,3
+---- TYPES
+INT, BIGINT
+====
+---- QUERY
+select my_map_3.key, my_map_3.value from iceberg_migrated_complex_test_orc, iceberg_migrated_complex_test_orc.struct_2_col.struct_3_col.string_double_map_col as my_map_3;
+---- RESULTS
+'B',1.5
+---- TYPES
+STRING, DOUBLE
+====
+---- QUERY
+select my_map_4.key, my_map_4.value from iceberg_migrated_complex_test_orc, iceberg_migrated_complex_test_orc.struct_2_col.int_int_map_col as my_map_4;
+---- RESULTS
+5,6
+---- TYPES
+INT, INT
+====
\ No newline at end of file
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
index 2de55494f..de276cae7 100644
--- a/tests/common/file_utils.py
+++ b/tests/common/file_utils.py
@@ -26,6 +26,40 @@ from subprocess import check_call
 from tests.util.filesystem_utils import get_fs_path
 
 
+def create_iceberg_table_from_directory(impala_client, unique_database, table_name,
+                                        file_format):
+  """Utility function to create an iceberg table from a directory. The directory must
+  exist in $IMPALA_HOME/testdata/data/iceberg_test with the name 'table_name'"""
+
+  # Only orc and parquet tested/supported
+  assert file_format == "orc" or file_format == "parquet"
+
+  local_dir = os.path.join(
+    os.environ['IMPALA_HOME'], 'testdata/data/iceberg_test/{0}'.format(table_name))
+  assert os.path.isdir(local_dir)
+
+  # Put the directory in the database's directory (not the table directory)
+  hdfs_parent_dir = get_fs_path("/test-warehouse")
+
+  hdfs_dir = os.path.join(hdfs_parent_dir, table_name)
+
+  # Purge existing files if any
+  check_call(['hdfs', 'dfs', '-rm', '-f', '-r', hdfs_dir])
+
+  # Note: -d skips a staging copy
+  check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_parent_dir])
+
+  # Create external table
+  qualified_table_name = '{0}.{1}'.format(unique_database, table_name)
+  impala_client.execute("""create external table {0} stored as iceberg location '{1}'
+                        tblproperties('write.format.default'='{2}', 'iceberg.catalog'=
+                        'hadoop.tables')""".format(qualified_table_name, hdfs_dir,
+                                                   file_format))
+
+  # Automatic clean up after drop table
+  impala_client.execute("""alter table {0} set tblproperties ('external.table.purge'=
+                        'True');""".format(qualified_table_name))
+
 def create_table_from_parquet(impala_client, unique_database, table_name):
   """Utility function to create a database table from a Parquet file. A Parquet file must
   exist in $IMPALA_HOME/testdata/data with the name 'table_name'.parquet"""
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index aab727fb6..230938dee 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -33,6 +33,7 @@ from tests.common.skip import SkipIf
 
 from tests.util.filesystem_utils import get_fs_path, IS_HDFS
 from tests.util.get_parquet_metadata import get_parquet_metadata
+from tests.common.file_utils import create_iceberg_table_from_directory
 
 class TestIcebergTable(ImpalaTestSuite):
   """Tests related to Iceberg tables."""
@@ -102,6 +103,18 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_migrated_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-migrated-tables', vector, unique_database)
 
+  def test_migrated_table_field_id_resolution(self, vector, unique_database):
+    create_iceberg_table_from_directory(self.client, unique_database,
+                                        "iceberg_migrated_alter_test", "parquet")
+    create_iceberg_table_from_directory(self.client, unique_database,
+                                        "iceberg_migrated_complex_test", "parquet")
+    create_iceberg_table_from_directory(self.client, unique_database,
+                                        "iceberg_migrated_alter_test_orc", "orc")
+    create_iceberg_table_from_directory(self.client, unique_database,
+                                        "iceberg_migrated_complex_test_orc", "orc")
+    self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution',
+                       vector, unique_database)
+
   def test_describe_history(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-table-history', vector, use_db=unique_database)
 


[impala] 06/06: IMPALA-11281: Load table metadata for ResetMetadataStmt

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5cae46a4bf359c10a657fb4c9e1e8f4e25187c9e
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Mon May 23 12:02:26 2022 -0700

    IMPALA-11281: Load table metadata for ResetMetadataStmt
    
    This patch loads the metadata of the table for ResetMetadataStmt if the
    table is not null and Ranger table masking is supported so that the
    information about the columns of the table could be used to check
    whether masking is enabled for any column in the table and thus the
    update operation on a table where there is a masking policy defined on
    any column could be blocked.
    
    Testing:
     - Added an E2E test to verify the update operation on a table by a
       requesting user would be denied if there is a column masking policy
       defined on any column in the table for the requesting user even
       though the table metadata have been invalidated immediately before
       the requesting user attempts to invalidate the table metadata again.
    
    Change-Id: I0c90b413974223886661697f11844d99a68fdebf
    Reviewed-on: http://gerrit.cloudera.org:8080/18561
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18917
    Tested-by: Quanlong Huang <hu...@gmail.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 .../apache/impala/analysis/StmtMetadataLoader.java | 12 ++++++++-
 tests/authorization/test_ranger.py                 | 29 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index 5da12183f..9bceee118 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -347,7 +347,17 @@ public class StmtMetadataLoader {
   private Set<TableName> collectTableCandidates(StatementBase stmt) {
     Preconditions.checkNotNull(stmt);
     List<TableRef> tblRefs = new ArrayList<>();
-    stmt.collectTableRefs(tblRefs);
+    // The information about whether table masking is supported is not available to
+    // ResetMetadataStmt so we collect the TableRef for ResetMetadataStmt whenever
+    // applicable.
+    if (stmt instanceof ResetMetadataStmt
+        && fe_.getAuthzFactory().getAuthorizationConfig().isEnabled()
+        && fe_.getAuthzFactory().supportsTableMasking()) {
+      TableName tableName = ((ResetMetadataStmt) stmt).getTableName();
+      if (tableName != null) tblRefs.add(new TableRef(tableName.toPath(), null));
+    } else {
+      stmt.collectTableRefs(tblRefs);
+    }
     Set<TableName> tableNames = new HashSet<>();
     for (TableRef ref: tblRefs) {
       tableNames.addAll(Path.getCandidateTables(ref.getPath(), sessionDb_));
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index d0caa7b17..eae23d4c7 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1073,6 +1073,35 @@ class TestRanger(CustomClusterTestSuite):
       for i in range(policy_cnt):
         TestRanger._remove_policy(unique_name + str(i))
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_block_metadata_update(self, vector, unique_name):
+    """Test that the metadata update operation on a table by a requesting user is denied
+       if there exists a column masking policy defined on any column in the table for the
+       requesting user even when the table metadata (e.g., list of columns) have been
+       invalidated immediately before the requesting user tries to invalidate the table
+       metadata again. This test would have failed if we did not load the table metadata
+       for ResetMetadataStmt."""
+    user = getuser()
+    admin_client = self.create_impala_client()
+    non_owner_client = self.create_impala_client()
+    try:
+      TestRanger._add_column_masking_policy(
+          unique_name, user, "functional", "alltypestiny", "id",
+          "CUSTOM", "id * 100")
+      self.execute_query_expect_success(admin_client,
+          "invalidate metadata functional.alltypestiny", user=ADMIN)
+      admin_client.execute("grant all on server to user {0}".format(user))
+      result = self.execute_query_expect_failure(
+          non_owner_client, "invalidate metadata functional.alltypestiny", user=user)
+      assert "User '{0}' does not have privileges to execute " \
+          "'INVALIDATE METADATA/REFRESH' on: functional.alltypestiny".format(user) \
+          in str(result)
+    finally:
+      TestRanger._remove_policy(unique_name)
+      admin_client.execute("revoke all on server from user {0}".format(user))
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)


[impala] 03/06: IMPALA-11457 Fix regression with unknown disk id

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 15fd47ed1a5151bd620e404612c82cf7fe94e9b0
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Tue Jul 26 10:32:11 2022 -0700

    IMPALA-11457 Fix regression with unknown disk id
    
    Fixes an issue where disk ID is set to 65536 incorrectly when storage ID
    is unknown due to mixed usage of short and ushort (introduced by
    d6b5f82e31732c355af3f3d1a8e5da94ba9c1349). The incorrect unknown disk ID
    would result in all local reads going to a single disk queue and
    reducing parallelism, which specifically happens with co-located Ozone
    endpoints.
    
    This issue caused the test_admission_control_with_multiple_coords to
    fail with Ozone.
    
    Change-Id: I571ac0669ceb6a42561594c3f96723d5ed293902
    Reviewed-on: http://gerrit.cloudera.org:8080/18792
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18914
    Tested-by: Quanlong Huang <hu...@gmail.com>
    Reviewed-by: Michael Smith <mi...@cloudera.com>
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
---
 common/fbs/CatalogObjects.fbs | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/common/fbs/CatalogObjects.fbs b/common/fbs/CatalogObjects.fbs
index 629938c6c..ce0f1a693 100644
--- a/common/fbs/CatalogObjects.fbs
+++ b/common/fbs/CatalogObjects.fbs
@@ -52,8 +52,8 @@ table FbFileBlock {
   replica_host_idxs: [ushort] (id: 2);
 
   // The list of disk ids for the file block. May not be set if disk ids are not
-  // supported.
-  disk_ids: [ushort] (id: 3);
+  // supported. Set to -1 for unknown disk id.
+  disk_ids: [short] (id: 3);
 }
 
 table FbFileDesc {