You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2023/03/10 16:35:00 UTC

[impala] branch master updated (0c7c6a335 -> b5524e95a)

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 0c7c6a335 IMPALA-11977: Fix Python 3 broken imports and object model differences
     new 29586d663 IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.
     new b5524e95a IMPALA-11935: Generate core dumps if ASAN/TSAN/UBSAN built be tests crash

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bin/run-backend-tests.sh                           |   6 +-
 common/thrift/JniCatalog.thrift                    |  34 ++-
 fe/src/main/cup/sql-parser.cup                     |   3 +-
 ...a => AlterTableExecuteExpireSnapshotsStmt.java} |  93 +++----
 .../analysis/AlterTableExecuteRollbackStmt.java    | 147 +++++++++++
 .../impala/analysis/AlterTableExecuteStmt.java     | 107 +++-----
 .../java/org/apache/impala/catalog/FeFsTable.java  |   2 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  25 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |  56 +++-
 fe/src/main/jflex/sql-scanner.flex                 |   3 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  47 +++-
 .../QueryTest/iceberg-rollback-negative.test       |  27 ++
 tests/custom_cluster/test_events_custom_configs.py |  17 +-
 tests/query_test/test_iceberg.py                   | 291 ++++++++++++++-------
 tests/util/iceberg_util.py                         |  31 +++
 15 files changed, 623 insertions(+), 266 deletions(-)
 copy fe/src/main/java/org/apache/impala/analysis/{AlterTableExecuteStmt.java => AlterTableExecuteExpireSnapshotsStmt.java} (54%)
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test


[impala] 02/02: IMPALA-11935: Generate core dumps if ASAN/TSAN/UBSAN built be tests crash

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5524e95a1f8ddc920287b9092558a86c987d037
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Tue Mar 7 14:52:17 2023 +0100

    IMPALA-11935: Generate core dumps if ASAN/TSAN/UBSAN built be tests crash
    
    By default, on 64-bit systems, programs built with sanitizers do not
    produce minidumps nor coredumps due to the shadow memory - which is used
    for house keeping - being huge and the generated dumps would be too
    big (~16TB).
    
    https://gcc.gnu.org/bugzilla//show_bug.cgi?id=89868
    
    The shadow memory can be stripped on exit and the core
    dumps can be generated by setting the (T|A|UB)SAN_OPTIONS environment
    variable before running the program.
    
    https://stackoverflow.com/questions/42851670/how-to-generate-core-dump-on-addresssanitizer-error
    
    This can help investigating crashes in sanitized builds.
    
    Testing:
     - Locally injected an std::abort() to a test case, built it with TSAN
    and verified the creation of the core dump and gdb showed the correct
    stack frame.
    
    Change-Id: Idd868fe0f666d683084a24808dd0dcd7766b837c
    Reviewed-on: http://gerrit.cloudera.org:8080/19598
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/run-backend-tests.sh | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/bin/run-backend-tests.sh b/bin/run-backend-tests.sh
index 4be9b1788..9952496f1 100755
--- a/bin/run-backend-tests.sh
+++ b/bin/run-backend-tests.sh
@@ -39,5 +39,9 @@ cd ..
 
 export CTEST_OUTPUT_ON_FAILURE=1
 
+export TSAN_OPTIONS="disable_coredump=0:unmap_shadow_on_exit=1"
+export ASAN_OPTIONS="disable_coredump=0:unmap_shadow_on_exit=1"
+export UBSAN_OPTIONS="disable_coredump=0:unmap_shadow_on_exit=1"
+
 export PATH="${IMPALA_TOOLCHAIN_PACKAGES_HOME}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
-"${MAKE_CMD:-make}" test ARGS="${BE_TEST_ARGS}"
+"${MAKE_CMD:-make}" test ARGS="${BE_TEST_ARGS}"
\ No newline at end of file


[impala] 01/02: IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 29586d6631d97acf13f208564db7c8c072154019
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Tue Sep 13 18:29:57 2022 -0700

    IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.
    
    Iceberg table modifications cause new table snapshots to be created;
    these snapshots represent an earlier version of the table. The Iceberg
    API provides a way to rollback the table to a previous snapshot.
    
    This change adds the ability to execute a rollback on Iceberg tables
    using the following statements:
    
    - ALTER TABLE <tbl> EXECUTE ROLLBACK(<snapshot id>)
    - ALTER TABLE <tbl> EXECUTE ROLLBACK('<timestamp>')
    
    The latter form of the command rolls back to the most recent snapshot
    that has a creation timestamp that is older than the specified
    timestamp.
    
    Note that when a table is rolled back to a snapshot, a new snapshot is
    created with the same snapshot id, but with a new creation timestamp.
    
    Testing:
     - Added analysis unit tests.
     - Added e2e tests.
     - Converted test_time_travel to use get_snapshots() from iceberg_util.
     - Add a utility class to allow pytests to create tables with various
       iceberg catalogs.
    
    Change-Id: Ic74913d3b81103949ffb5eef7cc936303494f8b9
    Reviewed-on: http://gerrit.cloudera.org:8080/19002
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/JniCatalog.thrift                    |  34 ++-
 fe/src/main/cup/sql-parser.cup                     |   3 +-
 ...a => AlterTableExecuteExpireSnapshotsStmt.java} |  93 +++----
 .../analysis/AlterTableExecuteRollbackStmt.java    | 147 +++++++++++
 .../impala/analysis/AlterTableExecuteStmt.java     | 107 +++-----
 .../java/org/apache/impala/catalog/FeFsTable.java  |   2 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  25 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |  56 +++-
 fe/src/main/jflex/sql-scanner.flex                 |   3 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  47 +++-
 .../QueryTest/iceberg-rollback-negative.test       |  27 ++
 tests/custom_cluster/test_events_custom_configs.py |  17 +-
 tests/query_test/test_iceberg.py                   | 291 ++++++++++++++-------
 tests/util/iceberg_util.py                         |  31 +++
 14 files changed, 618 insertions(+), 265 deletions(-)

diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index c41762a6f..56398eaf0 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -414,12 +414,38 @@ struct TAlterTableSetPartitionSpecParams {
   1: required CatalogObjects.TIcebergPartitionSpec partition_spec
 }
 
-// Parameters for ALTER TABLE EXECUTE operations.
-struct TAlterTableExecuteParams {
-  // The parameter of the ExpireSnapshot.expireOlderThan(timestampMillis) Iceberg call.
+// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS operations.
+struct TAlterTableExecuteExpireSnapshotsParams {
   1: required i64 older_than_millis
 }
 
+// ALTER TABLE EXECUTE ROLLBACK can be to a date or snapshot id.
+enum TRollbackType {
+  TIME_ID = 0
+  VERSION_ID = 1
+}
+
+// Parameters for ALTER TABLE EXECUTE ROLLBACK operations.
+struct TAlterTableExecuteRollbackParams {
+  // Is rollback to a date or snapshot id.
+  1: required TRollbackType kind
+
+  // If kind is TIME_ID this is the date to rollback to.
+  2: optional i64 timestamp_millis
+
+  // If kind is VERSION_ID this is the id to rollback to.
+  3: optional i64 snapshot_id
+}
+
+// Parameters for ALTER TABLE EXECUTE ... operations.
+struct TAlterTableExecuteParams {
+  // Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
+  1: optional TAlterTableExecuteExpireSnapshotsParams expire_snapshots_params
+
+  // Parameters for ALTER TABLE EXECUTE ROLLBACK
+  2: optional TAlterTableExecuteRollbackParams execute_rollback_params
+}
+
 // Parameters for all ALTER TABLE commands.
 struct TAlterTableParams {
   1: required TAlterTableType alter_type
@@ -478,7 +504,7 @@ struct TAlterTableParams {
   // Parameters for ALTER TABLE SET PARTITION SPEC
   19: optional TAlterTableSetPartitionSpecParams set_partition_spec_params
 
-  // Parameters for ALTER TABLE EXECUTE
+  // Parameters for ALTER TABLE EXECUTE operations
   20: optional TAlterTableExecuteParams set_execute_params
 }
 
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index dcf8eba5c..b136a4631 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -37,6 +37,7 @@ import org.apache.impala.analysis.SetOperationStmt.SetOperator;
 import org.apache.impala.analysis.RangePartition;
 import org.apache.impala.analysis.TableSampleClause;
 import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation;
+import org.apache.impala.analysis.AlterTableExecuteStmt;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionTransform;
@@ -1361,7 +1362,7 @@ alter_tbl_stmt ::=
   :}
   | KW_ALTER KW_TABLE table_name:table KW_EXECUTE function_call_expr:expr
   {:
-    RESULT = new AlterTableExecuteStmt(table, expr);
+    RESULT = AlterTableExecuteStmt.createExecuteStmt(table, expr);
   :}
   ;
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
similarity index 54%
copy from fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
copy to fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
index 141aa386d..16d8a3753 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
@@ -17,10 +17,13 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
 import org.apache.impala.thrift.TAlterTableExecuteParams;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableType;
@@ -28,59 +31,47 @@ import org.apache.impala.util.ExprUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
- * Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
- * tables, supported operations:
- *  - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
- *    calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
- *    TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
- *    should be retained even when all snapshots are selected by expireOlderThan().
+ * Represents an ALTER TABLE <tbl> EXECUTE EXPIRE_SNAPSHOTS(<parameters>) statement on
+ * Iceberg tables, the parameter is (<timestamp>).
  */
-public class AlterTableExecuteStmt extends AlterTableStmt {
-  private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
-
-  private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
+public class AlterTableExecuteExpireSnapshotsStmt extends AlterTableExecuteStmt {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AlterTableExecuteExpireSnapshotsStmt.class);
 
-  // Expression of the function call after EXECUTE keyword. Parsed into an operation and
-  // a value of that operation.
-  private FunctionCallExpr fnCallExpr_;
+  protected final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
 
-  // Value expression from fnCallExpr_.
-  private Expr fnParamValue_;
-
-  // The value after extracted from fnParamValue_ expression.
-  private long olderThanMillis_ = -1;
-
-  protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
-    super(tableName);
-    fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
+  protected AlterTableExecuteExpireSnapshotsStmt(TableName tableName, Expr fnCallExpr) {
+    super(tableName, fnCallExpr);
   }
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
     Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
-    analyzeFunctionCallExpr(analyzer);
+    analyzeFunctionCallExpr(analyzer, USAGE);
     analyzeOlderThan(analyzer);
   }
 
-  private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
-    // fnCallExpr_ analyzed here manually, because it is not an actual function but a
-    // catalog operation.
-    String fnName = fnCallExpr_.getFnName().toString();
-    if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
-      throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
-          "TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
-    }
-    if (fnCallExpr_.getParams().size() != 1) {
-      throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
-    }
-    fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
+  @Override
+  public String toSql(ToSqlOptions options) {
+    return fnCallExpr_.toSql();
   }
 
-  private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
+  @Override
+  public TAlterTableParams toThrift() {
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.EXECUTE);
+    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+    TAlterTableExecuteExpireSnapshotsParams executeExpireSnapshotsParams =
+        new TAlterTableExecuteExpireSnapshotsParams();
+    executeParams.setExpire_snapshots_params(executeExpireSnapshotsParams);
+    executeExpireSnapshotsParams.setOlder_than_millis(olderThanMillis_);
+    params.setSet_execute_params(executeParams);
+    return params;
+  }
+
+  protected void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
     Preconditions.checkNotNull(fnParamValue_);
     fnParamValue_.analyze(analyzer);
     if (!fnParamValue_.isConstant()) {
@@ -90,31 +81,17 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
       fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
     }
     if (!fnParamValue_.getType().isTimestamp()) {
-      throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
-          fnParamValue_.getType() + "': " + fnParamValue_.toSql());
+      throw new AnalysisException(USAGE + " must be a timestamp type but is '"
+          + fnParamValue_.getType() + "': " + fnParamValue_.toSql());
     }
     try {
       olderThanMillis_ =
           ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
-      LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
+      LOG.debug(USAGE + " millis: " + olderThanMillis_);
     } catch (InternalException ie) {
-      throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
-          USAGE + ": " + ie.getMessage(), ie);
+      throw new AnalysisException("Invalid TIMESTAMP expression has been given to "
+              + USAGE + ": " + ie.getMessage(),
+          ie);
     }
   }
-
-  @Override
-  public String toSql(ToSqlOptions options) {
-    return fnCallExpr_.toSql();
-  }
-
-  @Override
-  public TAlterTableParams toThrift() {
-    TAlterTableParams params = super.toThrift();
-    params.setAlter_type(TAlterTableType.EXECUTE);
-    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
-    executeParams.setOlder_than_millis(olderThanMillis_);
-    params.setSet_execute_params(executeParams);
-    return params;
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java
new file mode 100644
index 000000000..ea4da0b67
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.TRollbackType;
+import org.apache.impala.util.ExprUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an ALTER TABLE EXECUTE ROLLBACK(<parameter>) statement.
+ * The parameter can be a snapshot id, or a timestamp.
+ * A rollback to a snapshot id causes a new snapshot to be
+ * created with the same snapshot id, but with a new creation timestamp.
+ * A rollback to a timestamp rolls back to the latest snapshot
+ * that has a creation timestamp that is older than the specified
+ * timestamp.
+ */
+public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt {
+  public static final String USAGE = "EXECUTE ROLLBACK(<expression>):";
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AlterTableExecuteRollbackStmt.class);
+  private long snapshotVersion_;
+  private TRollbackType kind_;
+
+  public AlterTableExecuteRollbackStmt(TableName tableName, FunctionCallExpr fnCallExpr) {
+    super(tableName, fnCallExpr);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    FeTable table = getTargetTable();
+    if (!(table instanceof FeIcebergTable)) {
+      throw new AnalysisException("ALTER TABLE EXECUTE ROLLBACK is only supported "
+          + "for Iceberg tables: " + table.getTableName());
+    }
+    analyzeFunctionCallExpr(analyzer, USAGE);
+    analyzeParameter(analyzer);
+  }
+
+  private void analyzeParameter(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkNotNull(fnParamValue_);
+    fnParamValue_.analyze(analyzer);
+
+    if (!fnParamValue_.isConstant()) {
+      throw new AnalysisException(
+          USAGE + " <expression> must be a constant expression: EXECUTE " + toSql());
+    }
+    if ((fnParamValue_ instanceof LiteralExpr)
+        && (fnParamValue_.getType().isIntegerType())) {
+      // Parameter is a snapshot id
+      kind_ = TRollbackType.VERSION_ID;
+      snapshotVersion_ = fnParamValue_.evalToInteger(analyzer, USAGE);
+      if (snapshotVersion_ < 0) {
+        throw new AnalysisException("Invalid version number has been given to " + USAGE
+            + ": " + snapshotVersion_);
+      }
+      LOG.debug(USAGE + " version: " + snapshotVersion_);
+    } else {
+      Expr timestampEpr = getParamConvertibleToTimestamp();
+      if (timestampEpr != null) {
+        // Parameter is a timestamp.
+        kind_ = TRollbackType.TIME_ID;
+        try {
+          olderThanMillis_ =
+              ExprUtil.localTimestampToUnixTimeMicros(analyzer, timestampEpr) / 1000;
+          LOG.debug(USAGE + " millis: " + olderThanMillis_);
+        } catch (InternalException ie) {
+          throw new AnalysisException("An invalid TIMESTAMP expression has been given "
+                  + "to " + USAGE + " the expression " + fnParamValue_.toSql()
+                  + " cannot be converted to a TIMESTAMP",
+              ie);
+        }
+      } else {
+        throw new AnalysisException(USAGE
+            + " <expression> must be an integer type or a timestamp, but is '"
+            + fnParamValue_.getType() + "': EXECUTE " + toSql());
+      }
+    }
+  }
+
+  /**
+   * If field fnParamValue_ is a Timestamp, or can be cast to a Timestamp,
+   * then return an Expr for the Timestamp.
+   * @return null if the fnParamValue_ cannot be converted to a Timestamp.
+   */
+  private Expr getParamConvertibleToTimestamp() {
+    Expr timestampExpr = fnParamValue_;
+    if (timestampExpr.getType().isStringType()) {
+      timestampExpr = new CastExpr(Type.TIMESTAMP, fnParamValue_);
+    }
+    if (timestampExpr.getType().isTimestamp()) {
+      return timestampExpr;
+    }
+    return null;
+  }
+
+  @Override
+  public String toSql(ToSqlOptions options) {
+    return fnCallExpr_.toSql();
+  }
+
+  @Override
+  public TAlterTableParams toThrift() {
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.EXECUTE);
+    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+    TAlterTableExecuteRollbackParams executeRollbackParams =
+        new TAlterTableExecuteRollbackParams();
+    executeParams.setExecute_rollback_params(executeRollbackParams);
+    executeRollbackParams.setKind(kind_);
+    switch (kind_) {
+      case TIME_ID: executeRollbackParams.setTimestamp_millis(olderThanMillis_); break;
+      case VERSION_ID: executeRollbackParams.setSnapshot_id(snapshotVersion_); break;
+      default: throw new IllegalStateException("Bad kind of execute rollback " + kind_);
+    }
+    params.setSet_execute_params(executeParams);
+    return params;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
index 141aa386d..0a0e33206 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
@@ -17,104 +17,63 @@
 
 package org.apache.impala.analysis;
 
-import org.apache.impala.catalog.FeIcebergTable;
-import org.apache.impala.catalog.Type;
-import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.InternalException;
-import org.apache.impala.thrift.TAlterTableExecuteParams;
-import org.apache.impala.thrift.TAlterTableParams;
-import org.apache.impala.thrift.TAlterTableType;
-import org.apache.impala.util.ExprUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Preconditions;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.impala.common.AnalysisException;
+
 /**
  * Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
- * tables, supported operations:
- *  - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
- *    calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
- *    TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
- *    should be retained even when all snapshots are selected by expireOlderThan().
+ * tables. For supported operations see the subclasses.
  */
 public class AlterTableExecuteStmt extends AlterTableStmt {
-  private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
-
-  private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
 
   // Expression of the function call after EXECUTE keyword. Parsed into an operation and
   // a value of that operation.
-  private FunctionCallExpr fnCallExpr_;
-
+  protected FunctionCallExpr fnCallExpr_;
   // Value expression from fnCallExpr_.
-  private Expr fnParamValue_;
-
+  protected Expr fnParamValue_;
   // The value after extracted from fnParamValue_ expression.
-  private long olderThanMillis_ = -1;
+  protected long olderThanMillis_ = -1;
 
   protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
     super(tableName);
-    fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
+    fnCallExpr_ = (FunctionCallExpr) fnCallExpr;
   }
 
-  @Override
-  public void analyze(Analyzer analyzer) throws AnalysisException {
-    super.analyze(analyzer);
-    Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
-    analyzeFunctionCallExpr(analyzer);
-    analyzeOlderThan(analyzer);
+  /**
+   * Return an instance of a subclass of AlterTableExecuteStmt that can analyze the
+   * execute statement for the function call expression in 'expr'.
+   */
+  public static AlterTableStmt createExecuteStmt(TableName tableName, Expr expr)
+      throws AnalysisException {
+    FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr;
+    String functionNameOrig = fnCallExpr.getFnName().toString();
+    String functionName = functionNameOrig.toUpperCase();
+    switch (functionName) {
+      case "EXPIRE_SNAPSHOTS":
+        return new AlterTableExecuteExpireSnapshotsStmt(tableName, fnCallExpr);
+      case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr);
+      default:
+        throw new AnalysisException(String.format("'%s' is not supported by ALTER "
+                + "TABLE <table> EXECUTE. Supported operations are: "
+                + "EXPIRE_SNAPSHOTS(<expression>), "
+                + "ROLLBACK(<expression>).",
+            functionNameOrig));
+    }
   }
 
-  private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
+  protected void analyzeFunctionCallExpr(Analyzer ignoredAnalyzer, String usage)
+      throws AnalysisException {
     // fnCallExpr_ analyzed here manually, because it is not an actual function but a
     // catalog operation.
     String fnName = fnCallExpr_.getFnName().toString();
-    if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
-      throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
-          "TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
-    }
+    Preconditions.checkState(
+        StringUtils.equalsAnyIgnoreCase(fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK"));
     if (fnCallExpr_.getParams().size() != 1) {
-      throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
+      throw new AnalysisException(usage + " must have one parameter: " + toSql());
     }
     fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
   }
 
-  private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
-    Preconditions.checkNotNull(fnParamValue_);
-    fnParamValue_.analyze(analyzer);
-    if (!fnParamValue_.isConstant()) {
-      throw new AnalysisException(USAGE + " must be a constant expression: " + toSql());
-    }
-    if (fnParamValue_.getType().isStringType()) {
-      fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
-    }
-    if (!fnParamValue_.getType().isTimestamp()) {
-      throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
-          fnParamValue_.getType() + "': " + fnParamValue_.toSql());
-    }
-    try {
-      olderThanMillis_ =
-          ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
-      LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
-    } catch (InternalException ie) {
-      throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
-          USAGE + ": " + ie.getMessage(), ie);
-    }
-  }
-
-  @Override
-  public String toSql(ToSqlOptions options) {
-    return fnCallExpr_.toSql();
-  }
-
-  @Override
-  public TAlterTableParams toThrift() {
-    TAlterTableParams params = super.toThrift();
-    params.setAlter_type(TAlterTableType.EXECUTE);
-    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
-    executeParams.setOlder_than_millis(olderThanMillis_);
-    params.setSet_execute_params(executeParams);
-    return params;
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 28a82ac03..e2fa4e713 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -178,7 +178,7 @@ public interface FeFsTable extends FeTable {
   Map<Long, ? extends PrunablePartition> getPartitionMap();
 
   /**
-   * @param the index of the target partitioning column
+   * @param col the index of the target partitioning column
    * @return a map from value to a set of partitions for which column 'col'
    * has that value.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3830ffa7e..6e2a3f478 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -165,6 +165,7 @@ import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableAlterColParams;
 import org.apache.impala.thrift.TAlterTableDropColParams;
 import org.apache.impala.thrift.TAlterTableDropPartitionParams;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
 import org.apache.impala.thrift.TAlterTableOrViewSetOwnerParams;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableReplaceColsParams;
@@ -1367,9 +1368,23 @@ public class CatalogOpExecutor {
           break;
         case EXECUTE:
           Preconditions.checkState(params.isSetSet_execute_params());
-          String summary = IcebergCatalogOpExecutor.alterTableExecute(iceTxn,
-              params.getSet_execute_params());
-          addSummary(response, summary);
+          // All the EXECUTE functions operate only on Iceberg data.
+          needsToUpdateHms = false;
+          TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params();
+          if (setExecuteParams.isSetExecute_rollback_params()) {
+            String rollbackSummary = IcebergCatalogOpExecutor.alterTableExecuteRollback(
+                iceTxn, tbl, setExecuteParams.getExecute_rollback_params());
+            addSummary(response, rollbackSummary);
+          } else if (setExecuteParams.isSetExpire_snapshots_params()) {
+            String expireSummary =
+                IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(
+                    iceTxn, setExecuteParams.getExpire_snapshots_params());
+            addSummary(response, expireSummary);
+          } else {
+            // Cannot happen, but throw just in case.
+            throw new IllegalStateException(
+                "Alter table execute statement is not implemented.");
+          }
           break;
         case SET_PARTITION_SPEC:
           // Set partition spec uses 'TableOperations', not transactions.
@@ -1393,7 +1408,7 @@ public class CatalogOpExecutor {
           break;
         case REPLACE_COLUMNS:
           // It doesn't make sense to replace all the columns of an Iceberg table as it
-          // would basically make all existing data unaccessible.
+          // would basically make all existing data inaccessible.
         default:
           throw new UnsupportedOperationException(
               "Unsupported ALTER TABLE operation for Iceberg tables: " +
@@ -1414,7 +1429,7 @@ public class CatalogOpExecutor {
 
     if (!needsToUpdateHms) {
       // We don't need to update HMS because either it is already done by Iceberg's
-      // HiveCatalog, or we modified the PARTITION SPEC which is not stored in HMS.
+      // HiveCatalog, or we modified the Iceberg data which is not stored in HMS.
       loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " +
           params.getAlter_type().name());
       catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 75aa6e9a0..10d19f7c5 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -23,16 +23,17 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.BaseReplacePartitions;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.ManageSnapshots;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotManager;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -42,32 +43,36 @@ import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.TableNotFoundException;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergColumnStats;
 import org.apache.impala.fb.FbIcebergDataFile;
-import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
+import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergOperationParam;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
+import org.apache.impala.thrift.TRollbackType;
 import org.apache.impala.util.IcebergSchemaConverter;
 import org.apache.impala.util.IcebergUtil;
-import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a helper for the CatalogOpExecutor to provide Iceberg related DDL functionality
  * such as creating and dropping tables from Iceberg.
  */
 public class IcebergCatalogOpExecutor {
-  public static final Logger LOG = Logger.getLogger(IcebergCatalogOpExecutor.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(IcebergCatalogOpExecutor.class);
 
   /**
    * Create Iceberg table by Iceberg api
@@ -177,14 +182,49 @@ public class IcebergCatalogOpExecutor {
     tableOp.commit(metadata, newMetadata);
   }
 
-  public static String alterTableExecute(Transaction txn,
-      TAlterTableExecuteParams params) {
+  /**
+   * Use the ExpireSnapshot API to expire snapshots by calling the
+   * ExpireSnapshot.expireOlderThan(timestampMillis) method.
+   * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
+   * should be retained even when all snapshots are selected by expireOlderThan().
+   */
+  public static String alterTableExecuteExpireSnapshots(
+      Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) {
     ExpireSnapshots expireApi = txn.expireSnapshots();
+    Preconditions.checkState(params.isSetOlder_than_millis());
     expireApi.expireOlderThan(params.older_than_millis);
     expireApi.commit();
     return "Snapshots have been expired.";
   }
 
+  /**
+   * Executes an ALTER TABLE EXECUTE ROLLBACK.
+   */
+  public static String alterTableExecuteRollback(
+      Transaction iceTxn, FeIcebergTable tbl, TAlterTableExecuteRollbackParams params) {
+    TRollbackType kind = params.getKind();
+    ManageSnapshots manageSnapshots = iceTxn.manageSnapshots();
+    switch (kind) {
+      case TIME_ID:
+        Preconditions.checkState(params.isSetTimestamp_millis());
+        long timestampMillis = params.getTimestamp_millis();
+        LOG.info("Rollback iceberg table to snapshot before timestamp {}",
+            timestampMillis);
+        manageSnapshots.rollbackToTime(timestampMillis);
+        break;
+      case VERSION_ID:
+        Preconditions.checkState(params.isSetSnapshot_id());
+        long snapshotId = params.getSnapshot_id();
+        LOG.info("Rollback iceberg table to snapshot id {}", snapshotId);
+        manageSnapshots.rollbackTo(snapshotId);
+        break;
+      default: throw new IllegalStateException("Bad kind of execute rollback " + kind);
+    }
+    // Commit the update.
+    manageSnapshots.commit();
+    return "Rollback executed.";
+  }
+
   /**
    * Drops a column from a Iceberg table.
    */
@@ -370,4 +410,4 @@ public class IcebergCatalogOpExecutor {
                     String.valueOf(version));
     updateProps.commit();
   }
-}
+}
\ No newline at end of file
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 6a96ef100..418334be1 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -408,7 +408,8 @@ import org.apache.impala.thrift.TReservedWordsVersion;
         "year", "month", "day", "hour", "minute", "second",
         "begin", "call", "check", "classifier", "close", "identity", "language",
         "localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
-        "period", "result", "return", "sql", "start", "system", "time", "user", "value"
+        "period", "result", "return", "rollback", "sql", "start", "system", "time",
+        "user", "value"
     }));
   }
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 5986fb1c8..c4f656d3b 100755
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -4281,8 +4281,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Negative tests
     AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
         "unsupported_operation(123456789);", "'unsupported_operation' is not supported " +
-        "by ALTER TABLE <table> EXECUTE. Supported operation is " +
-        "EXPIRE_SNAPSHOTS(<expression>)");
+        "by ALTER TABLE <table> EXECUTE. Supported operations are: " +
+        "EXPIRE_SNAPSHOTS(<expression>), ROLLBACK(<expression>)");
     AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
         "expire_snapshots(now(), 3);", "EXPIRE_SNAPSHOTS(<expression>) must have one " +
         "parameter: expire_snapshots(now(), 3)");
@@ -4297,6 +4297,49 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         " been given to EXPIRE_SNAPSHOTS(<expression>)");
   }
 
+  @Test
+  public void TestAlterExecuteRollback() {
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('2022-01-04 10:00:00');");
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(123456);");
+    // Timestamp can be an expression.
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(cast('2021-08-09 15:52:45' as timestamp) - interval 2 days + " +
+        "interval 3 hours);");
+
+    // Negative tests
+    AnalysisError("alter table nodb.alltypes execute " +
+        "rollback('2022-01-04 10:00:00');",
+       "Could not resolve table reference: 'nodb.alltypes'");
+    AnalysisError("alter table functional.alltypes execute " +
+        "rollback('2022-01-04 10:00:00');",
+       "ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables: " +
+           "functional.alltypes");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(id);", "EXECUTE ROLLBACK(<expression>): " +
+        "<expression> must be a constant expression: EXECUTE rollback(id)");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(3.14);", "EXECUTE ROLLBACK(<expression>): <expression> " +
+        "must be an integer type or a timestamp, but is 'DECIMAL(3,2)': " +
+        "EXECUTE rollback(3.14)");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('2021-02-32 15:52:45');", "An invalid TIMESTAMP expression has been " +
+        "given to EXECUTE ROLLBACK(<expression>): the expression " +
+        "'2021-02-32 15:52:45' cannot be converted to a TIMESTAMP");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('the beginning');", "An invalid TIMESTAMP expression has been " +
+        "given to EXECUTE ROLLBACK(<expression>): the expression " +
+        "'the beginning' cannot be converted to a TIMESTAMP");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(1111,2222);",
+        "EXECUTE ROLLBACK(<expression>): must have one parameter");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('1111');", "An invalid TIMESTAMP expression has been " +
+        "given to EXECUTE ROLLBACK(<expression>): the expression " +
+        "'1111' cannot be converted to a TIMESTAMP");
+  }
+
   private static String buildLongOwnerName() {
     StringBuilder comment = new StringBuilder();
     for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test
new file mode 100644
index 000000000..4c2c3cd18
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test
@@ -0,0 +1,27 @@
+====
+---- QUERY
+# EXECUTE ROLLBACK with an invalid snapshot id on a partitioned Iceberg table.
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK(1)
+---- CATCH
+Cannot roll back to unknown snapshot id: 1
+====
+---- QUERY
+# EXECUTE ROLLBACK to a too old date on a partitioned Iceberg table.
+set timezone=CET;
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('2020-08-31 07:58:00')
+---- CATCH
+Cannot roll back, no valid snapshot older than: 1598853480000
+====
+---- QUERY
+# EXECUTE ROLLBACK to an Invalid timestamp expression on a partitioned Iceberg table.
+set timezone=CET;
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('1111');
+---- CATCH
+An invalid TIMESTAMP expression has been given to EXECUTE ROLLBACK(<expression>): the expression '1111' cannot be converted to a TIMESTAMP
+====
+---- QUERY
+# EXECUTE ROLLBACK fails on a non-Iceberg table.
+ALTER TABLE functional_parquet.alltypestiny EXECUTE ROLLBACK(1111111)
+---- CATCH
+ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables
+====
\ No newline at end of file
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index dc7ac51c9..0a53c31f7 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -28,6 +28,7 @@ from tests.common.skip import SkipIfFS
 from tests.util.hive_utils import HiveDbWrapper
 from tests.util.event_processor_utils import EventProcessorUtils
 from tests.util.filesystem_utils import WAREHOUSE
+from tests.util.iceberg_util import IcebergCatalogs
 
 
 @SkipIfFS.hive
@@ -482,6 +483,7 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
   def test_iceberg_self_events(self, unique_database):
     """This test checks that Impala doesn't refresh Iceberg tables on self events."""
     tbl_name = unique_database + ".test_iceberg_events"
+    iceberg_catalogs = IcebergCatalogs(unique_database)
 
     def check_self_events(query, skips_events=True):
       tbls_refreshed_before, partitions_refreshed_before, \
@@ -495,19 +497,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
       if skips_events:
         assert events_skipped_after > events_skipped_before
 
-    hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
-    hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " +
-        "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
-            unique_database))
-    hive_catalog = "'iceberg.catalog'='hive.catalog'"
-    hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
-    hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
-
-    all_catalogs = [hadoop_tables, hadoop_catalog, hive_catalog, hive_catalogs,
-        hadoop_catalogs]
-
-    for catalog in all_catalogs:
-      is_hive_catalog = catalog == hive_catalog or catalog == hive_catalogs
+    for catalog in iceberg_catalogs.get_iceberg_catalog_properties():
+      is_hive_catalog = iceberg_catalogs.is_a_hive_catalog(catalog)
       self.client.execute("""
           CREATE TABLE {0} (i int) STORED AS ICEBERG
           TBLPROPERTIES ({1})""".format(tbl_name, catalog))
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 6f64ced0e..a8e05baf5 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -22,6 +22,8 @@ import logging
 import os
 import pytest
 import random
+
+import re
 import time
 
 from subprocess import check_call
@@ -33,14 +35,14 @@ import json
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.iceberg_test_suite import IcebergTestSuite
-from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfLocal
+from tests.common.skip import SkipIf, SkipIfDockerizedCluster
 from tests.common.file_utils import (
   create_iceberg_table_from_directory,
   create_table_from_parquet)
 from tests.shell.util import run_impala_shell_cmd
 from tests.util.filesystem_utils import get_fs_path, IS_HDFS
 from tests.util.get_parquet_metadata import get_parquet_metadata
-from tests.util.iceberg_util import cast_ts, quote, parse_timestamp
+from tests.util.iceberg_util import cast_ts, quote, get_snapshots, IcebergCatalogs
 
 LOG = logging.getLogger(__name__)
 
@@ -69,58 +71,61 @@ class TestIcebergTable(IcebergTestSuite):
 
   def test_expire_snapshots(self, unique_database):
     tbl_name = unique_database + ".expire_snapshots"
-
-    # We are setting the TIMEZONE query option in this test, so let's create a local
-    # impala client.
-    with self.create_impala_client() as impalad_client:
-      # Iceberg doesn't create a snapshot entry for the initial empty table
-      impalad_client.execute("create table {0} (i int) stored as iceberg"
-          .format(tbl_name))
-      ts_0 = datetime.datetime.now()
-      insert_q = "insert into {0} values (1)".format(tbl_name)
-      ts_1 = self.execute_query_ts(impalad_client, insert_q)
-      time.sleep(5)
-      impalad_client.execute(insert_q)
-      time.sleep(5)
-      ts_2 = self.execute_query_ts(impalad_client, insert_q)
-      impalad_client.execute(insert_q)
-
-      # There should be 4 snapshots initially
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
-      # Expire the oldest snapshot and test that the oldest one was expired
-      expire_q = "alter table {0} execute expire_snapshots({1})"
-      impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
-
-      # Expire with a timestamp in which the interval does not touch existing snapshot
-      impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
-
-      # Expire all, but retain 1
-      impalad_client.execute(expire_q.format(tbl_name,
-          cast_ts(datetime.datetime.now())))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
-
-      # Change number of retained snapshots, then expire all
-      impalad_client.execute("""alter table {0} set tblproperties
-          ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
-      impalad_client.execute(insert_q)
-      impalad_client.execute(insert_q)
-      impalad_client.execute(expire_q.format(tbl_name,
-          cast_ts(datetime.datetime.now())))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
-
-      # Check that timezone is interpreted in local timezone controlled by query option
-      # TIMEZONE.
-      impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
-      impalad_client.execute(insert_q)
-      ts_tokyo = self.impala_now(impalad_client)
-      impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
-      impalad_client.execute(insert_q)
-      impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
-      impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
+    iceberg_catalogs = IcebergCatalogs(unique_database)
+    for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
+      # We are setting the TIMEZONE query option in this test, so let's create a local
+      # impala client.
+      with self.create_impala_client() as impalad_client:
+        # Iceberg doesn't create a snapshot entry for the initial empty table
+        impalad_client.execute("""
+          create table {0} (i int) stored as iceberg
+          TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
+        ts_0 = datetime.datetime.now()
+        insert_q = "insert into {0} values (1)".format(tbl_name)
+        ts_1 = self.execute_query_ts(impalad_client, insert_q)
+        time.sleep(5)
+        impalad_client.execute(insert_q)
+        time.sleep(5)
+        ts_2 = self.execute_query_ts(impalad_client, insert_q)
+        impalad_client.execute(insert_q)
+
+        # There should be 4 snapshots initially
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
+        # Expire the oldest snapshot and test that the oldest one was expired
+        expire_q = "alter table {0} execute expire_snapshots({1})"
+        impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
+
+        # Expire with a timestamp in which the interval does not touch existing snapshot
+        impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+
+        # Expire all, but retain 1
+        impalad_client.execute(expire_q.format(tbl_name,
+            cast_ts(datetime.datetime.now())))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
+
+        # Change number of retained snapshots, then expire all
+        impalad_client.execute("""alter table {0} set tblproperties
+            ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
+        impalad_client.execute(insert_q)
+        impalad_client.execute(insert_q)
+        impalad_client.execute(expire_q.format(tbl_name,
+            cast_ts(datetime.datetime.now())))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
+
+        # Check that timezone is interpreted in local timezone controlled by query option
+        # TIMEZONE.
+        impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+        impalad_client.execute(insert_q)
+        ts_tokyo = self.impala_now(impalad_client)
+        impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
+        impalad_client.execute(insert_q)
+        impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+        impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
+        impalad_client.execute("DROP TABLE {0}".format(tbl_name))
 
   def test_truncate_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
@@ -128,7 +133,7 @@ class TestIcebergTable(IcebergTestSuite):
   # With IMPALA-11429 there is an extra "ALTER TABLE SET OWNER" right after executing
   # "CREATE TABLE". As a result dropping the table location right after CREATE TABLE will
   # trigger a known bug: IMPALA-11509. Hence, turning this test off until there is a fix
-  # for this issue. Note, we could add a sleep righ after table creation that could
+  # for this issue. Note, we could add a sleep right after table creation that could
   # workaround the above mentioned bug but then we would hit another issue: IMPALA-11502.
   @SkipIf.not_dfs
   def test_drop_incomplete_table(self, vector, unique_database):
@@ -231,31 +236,127 @@ class TestIcebergTable(IcebergTestSuite):
         tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name))
     self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name))
     self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name))
-    result = self.client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
-    assert(len(result.data) == 2)
-    first_snapshot = result.data[0].split("\t")
-    second_snapshot = result.data[1].split("\t")
+    snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2)
+    first_snapshot = snapshots[0]
+    second_snapshot = snapshots[1]
     # Check that first snapshot is older than the second snapshot.
-    assert(first_snapshot[0] < second_snapshot[0])
+    assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time())
     # Check that second snapshot's parent ID is the snapshot ID of the first snapshot.
-    assert(first_snapshot[1] == second_snapshot[2])
+    assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id())
     # The first snapshot has no parent snapshot ID.
-    assert(first_snapshot[2] == "NULL")
+    assert(first_snapshot.get_parent_id() is None)
     # Check "is_current_ancestor" column.
-    assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE")
+    assert(first_snapshot.is_current_ancestor())
+    assert(second_snapshot.is_current_ancestor())
+
+  def test_execute_rollback_negative(self, vector):
+    """Negative test for EXECUTE ROLLBACK."""
+    self.run_test_case('QueryTest/iceberg-rollback-negative', vector)
+
+  def test_execute_rollback(self, unique_database):
+    """Test for EXECUTE ROLLBACK."""
+    iceberg_catalogs = IcebergCatalogs(unique_database)
+    for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
+      # Create a table with multiple snapshots.
+      tbl_name = unique_database + ".iceberg_execute_rollback"
+      # We are setting the TIMEZONE query option in this test, so let's create a local
+      # impala client.
+      with self.create_impala_client() as impalad_client:
+        impalad_client.execute("""
+          create table {0} (i int) stored as iceberg
+          TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
+        initial_snapshots = 3
+        for i in range(initial_snapshots):
+          impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i))
+        snapshots = get_snapshots(impalad_client, tbl_name,
+          expected_result_size=initial_snapshots)
+
+        output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id())
+        LOG.info("success output={0}".format(output))
+
+        # We rolled back, but that creates a new snapshot, so now there are 4.
+        snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4)
+        # The new snapshot has the same id (and parent id) as the snapshot we rolled back
+        # to, but it has a different creation time.
+        assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id()
+        assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id()
+        assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time()
+        # The "orphaned" snapshot is now not a current ancestor.
+        assert not snapshots[2].is_current_ancestor()
+
+        # We cannot roll back to a snapshot that is not a current ancestor.
+        output = self.rollback_to_id_expect_failure(tbl_name,
+            snapshots[2].get_snapshot_id(),
+            expected_text="Cannot roll back to snapshot, not an ancestor of the current "
+                          "state")
+
+        # Create another snapshot.
+        before_insert = datetime.datetime.now()
+        impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4))
+        snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
+
+        # Rollback to before the last insert.
+        self.rollback_to_ts(impalad_client, tbl_name, before_insert)
+        # This creates another snapshot.
+        snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6)
+        # The snapshot id is the same, the dates differ
+        assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id()
+        assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time()
+        assert not snapshots[4].is_current_ancestor()
+
+        # Show that the EXECUTE ROLLBACK is respecting the current timezone.
+        # To do this we try to roll back to a time for which there is no
+        # snapshot, this will fail with an error message that includes the specified
+        # time. We parse out that time. By doing this in two timezones we can see
+        # that the parameter being used was affected by the current timezone.
+        one_hour_ago = before_insert - datetime.timedelta(hours=1)
+        # We use Timezones from Japan and Iceland to avoid any DST complexities.
+        impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+        japan_ts = self.get_snapshot_ts_from_failed_rollback(
+            impalad_client, tbl_name, one_hour_ago)
+        impalad_client.execute("SET TIMEZONE='Iceland'")
+        iceland_ts = self.get_snapshot_ts_from_failed_rollback(
+            impalad_client, tbl_name, one_hour_ago)
+        diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60)
+        assert diff_hours == 9
+
+        impalad_client.execute("DROP TABLE {0}".format(tbl_name))
+
+  def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts):
+    """Run an EXECUTE ROLLBACK which is expected to fail.
+    Parse the error message to extract the timestamp for which there
+    was no snapshot, and convert the string to an integer"""
+    try:
+      self.rollback_to_ts(client, tbl_name, ts)
+      assert False, "Query should have failed"
+    except ImpalaBeeswaxException as e:
+      result = re.search(r".*no valid snapshot older than: (\d+)", str(e))
+      time_str = result.group(1)
+      snapshot_ts = int(time_str)
+      assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result)
+      return snapshot_ts
+
+  def rollback_to_ts(self, client, tbl_name, ts):
+    """Rollback a table to a snapshot timestamp."""
+    query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat())
+    return self.execute_query_expect_success(client, query)
+
+  def rollback_to_id(self, tbl_name, id):
+    """Rollback a table to a snapshot id."""
+    query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
+    return self.execute_query_expect_success(self.client, query)
+
+  def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None):
+    """Attempt to roll back a table to a snapshot id, expecting a failure."""
+    query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
+    output = self.execute_query_expect_failure(self.client, query)
+    if expected_text:
+      assert expected_text in str(output)
+    return output
 
   def test_describe_history_params(self, unique_database):
     tbl_name = unique_database + ".describe_history"
 
-    def expect_results_between(ts_start, ts_end, expected_result_size):
-      query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format(
-        tbl_name, cast_ts(ts_start), cast_ts(ts_end))
-      data = impalad_client.execute(query)
-      assert len(data.data) == expected_result_size
-      for i in range(len(data.data)):
-        result_ts_dt = parse_timestamp(data.data[i].split('\t')[0])
-        assert result_ts_dt >= ts_start and result_ts_dt <= ts_end
-
     # We are setting the TIMEZONE query option in this test, so let's create a local
     # impala client.
     with self.create_impala_client() as impalad_client:
@@ -279,10 +380,11 @@ class TestIcebergTable(IcebergTestSuite):
       self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0)
 
       # Describe history with BETWEEN <ts> AND <ts> predicate
-      expect_results_between(ts_1, ts_2, 1)
-      expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2, 2)
-      expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2 +
-          datetime.timedelta(hours=1), 3)
+      self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1)
+      self.expect_results_between(impalad_client, tbl_name,
+          ts_1 - datetime.timedelta(hours=1), ts_2, 2)
+      self.expect_results_between(impalad_client, tbl_name,
+          ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3)
 
       # Check that timezone is interpreted in local timezone controlled by query option
       # TIMEZONE. Persist the local times first and create a new snapshot.
@@ -341,14 +443,6 @@ class TestIcebergTable(IcebergTestSuite):
               tbl_name, snapshot_id),
           expected)
 
-    def get_snapshots():
-      data = impalad_client.execute("describe history {0}".format(tbl_name))
-      ret = list()
-      for row in data.data:
-        fields = row.split('\t')
-        ret.append(fields[1])
-      return ret
-
     def impala_now():
       now_data = impalad_client.execute("select now()")
       return now_data.data[0]
@@ -409,12 +503,12 @@ class TestIcebergTable(IcebergTestSuite):
                        ij_cols)
 
       # Query table as of snapshot IDs.
-      snapshots = get_snapshots()
-      expect_results_v(snapshots[0], ['1'], i_cols)
-      expect_results_v(snapshots[1], ['1', '2'], i_cols)
-      expect_results_v(snapshots[2], [], i_cols)
-      expect_results_v(snapshots[3], ['100'], i_cols)
-      expect_results_v(snapshots[4], ['100\tNULL', '3\t103'], ij_cols)
+      snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
+      expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols)
+      expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols)
+      expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols)
+      expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols)
+      expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols)
 
       # Test of plain count star optimization
       # 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile
@@ -427,11 +521,11 @@ class TestIcebergTable(IcebergTestSuite):
       expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0')
       expect_for_count_star_t(cast_ts(ts_5), '2')
       expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2')
-      expect_for_count_star_v(snapshots[0], '1')
-      expect_for_count_star_v(snapshots[1], '2')
-      expect_for_count_star_v(snapshots[2], '0')
-      expect_for_count_star_v(snapshots[3], '1')
-      expect_for_count_star_v(snapshots[4], '2')
+      expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1')
+      expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2')
+      expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0')
+      expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1')
+      expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2')
 
       # SELECT diff
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
@@ -442,18 +536,19 @@ class TestIcebergTable(IcebergTestSuite):
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
                         MINUS
                         SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
-                     tbl=tbl_name, v_new=snapshots[1], v_old=snapshots[0]),
+                     tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(),
+                     v_old=snapshots[0].get_snapshot_id()),
                      ['2'], i_cols)
       # Mix SYSTEM_TIME and SYSTEM_VERSION
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
                         MINUS
                         SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
-                     tbl=tbl_name, v_new=snapshots[1], ts_old=ts_1),
+                     tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1),
                      ['2'], i_cols)
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
                         MINUS
                         SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
-                     tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0]),
+                     tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()),
                      ['2'], i_cols)
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
                         MINUS
diff --git a/tests/util/iceberg_util.py b/tests/util/iceberg_util.py
index 924b5e249..21703acfd 100644
--- a/tests/util/iceberg_util.py
+++ b/tests/util/iceberg_util.py
@@ -106,3 +106,34 @@ def get_snapshots(impalad_client, tbl_name, ts_start=None, ts_end=None,
   for snapshot_str in rows.data:
     results.append(Snapshot(snapshot_str))
   return results
+
+
+class IcebergCatalogs:
+  """Utility class to generate TBLPROPERTIES corresponding to various iceberg catalogs."""
+
+  def __init__(self, database):
+    """Create a IcebergCatalogs object parameterized by database name."""
+    self.database = database
+
+  hive_catalog = "'iceberg.catalog'='hive.catalog'"
+  hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
+  hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
+  hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
+
+  def get_iceberg_catalog_properties(self):
+    """Return a list containing TBLPROPERTIES corresponding to various iceberg catalogs.
+     The TBLPROPERTIES can be used to create tables."""
+    hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', "
+        + "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
+          self.database))
+    return [
+      self.hadoop_tables,
+      self.hive_catalog,
+      self.hive_catalogs,
+      self.hadoop_catalogs,
+      hadoop_catalog
+    ]
+
+  def is_a_hive_catalog(self, catalog):
+    """Return true if the catalog property is for a Hive catalog."""
+    return catalog == self.hive_catalog or catalog == self.hive_catalogs