You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2023/03/10 16:35:01 UTC

[impala] 01/02: IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 29586d6631d97acf13f208564db7c8c072154019
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Tue Sep 13 18:29:57 2022 -0700

    IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.
    
    Iceberg table modifications cause new table snapshots to be created;
    these snapshots represent an earlier version of the table. The Iceberg
    API provides a way to rollback the table to a previous snapshot.
    
    This change adds the ability to execute a rollback on Iceberg tables
    using the following statements:
    
    - ALTER TABLE <tbl> EXECUTE ROLLBACK(<snapshot id>)
    - ALTER TABLE <tbl> EXECUTE ROLLBACK('<timestamp>')
    
    The latter form of the command rolls back to the most recent snapshot
    that has a creation timestamp that is older than the specified
    timestamp.
    
    Note that when a table is rolled back to a snapshot, a new snapshot is
    created with the same snapshot id, but with a new creation timestamp.
    
    Testing:
     - Added analysis unit tests.
     - Added e2e tests.
     - Converted test_time_travel to use get_snapshots() from iceberg_util.
     - Add a utility class to allow pytests to create tables with various
       iceberg catalogs.
    
    Change-Id: Ic74913d3b81103949ffb5eef7cc936303494f8b9
    Reviewed-on: http://gerrit.cloudera.org:8080/19002
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/JniCatalog.thrift                    |  34 ++-
 fe/src/main/cup/sql-parser.cup                     |   3 +-
 ...a => AlterTableExecuteExpireSnapshotsStmt.java} |  93 +++----
 .../analysis/AlterTableExecuteRollbackStmt.java    | 147 +++++++++++
 .../impala/analysis/AlterTableExecuteStmt.java     | 107 +++-----
 .../java/org/apache/impala/catalog/FeFsTable.java  |   2 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  25 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |  56 +++-
 fe/src/main/jflex/sql-scanner.flex                 |   3 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  47 +++-
 .../QueryTest/iceberg-rollback-negative.test       |  27 ++
 tests/custom_cluster/test_events_custom_configs.py |  17 +-
 tests/query_test/test_iceberg.py                   | 291 ++++++++++++++-------
 tests/util/iceberg_util.py                         |  31 +++
 14 files changed, 618 insertions(+), 265 deletions(-)

diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index c41762a6f..56398eaf0 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -414,12 +414,38 @@ struct TAlterTableSetPartitionSpecParams {
   1: required CatalogObjects.TIcebergPartitionSpec partition_spec
 }
 
-// Parameters for ALTER TABLE EXECUTE operations.
-struct TAlterTableExecuteParams {
-  // The parameter of the ExpireSnapshot.expireOlderThan(timestampMillis) Iceberg call.
+// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS operations.
+struct TAlterTableExecuteExpireSnapshotsParams {
   1: required i64 older_than_millis
 }
 
+// ALTER TABLE EXECUTE ROLLBACK can be to a date or snapshot id.
+enum TRollbackType {
+  TIME_ID = 0
+  VERSION_ID = 1
+}
+
+// Parameters for ALTER TABLE EXECUTE ROLLBACK operations.
+struct TAlterTableExecuteRollbackParams {
+  // Is rollback to a date or snapshot id.
+  1: required TRollbackType kind
+
+  // If kind is TIME_ID this is the date to rollback to.
+  2: optional i64 timestamp_millis
+
+  // If kind is VERSION_ID this is the id to rollback to.
+  3: optional i64 snapshot_id
+}
+
+// Parameters for ALTER TABLE EXECUTE ... operations.
+struct TAlterTableExecuteParams {
+  // Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
+  1: optional TAlterTableExecuteExpireSnapshotsParams expire_snapshots_params
+
+  // Parameters for ALTER TABLE EXECUTE ROLLBACK
+  2: optional TAlterTableExecuteRollbackParams execute_rollback_params
+}
+
 // Parameters for all ALTER TABLE commands.
 struct TAlterTableParams {
   1: required TAlterTableType alter_type
@@ -478,7 +504,7 @@ struct TAlterTableParams {
   // Parameters for ALTER TABLE SET PARTITION SPEC
   19: optional TAlterTableSetPartitionSpecParams set_partition_spec_params
 
-  // Parameters for ALTER TABLE EXECUTE
+  // Parameters for ALTER TABLE EXECUTE operations
   20: optional TAlterTableExecuteParams set_execute_params
 }
 
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index dcf8eba5c..b136a4631 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -37,6 +37,7 @@ import org.apache.impala.analysis.SetOperationStmt.SetOperator;
 import org.apache.impala.analysis.RangePartition;
 import org.apache.impala.analysis.TableSampleClause;
 import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation;
+import org.apache.impala.analysis.AlterTableExecuteStmt;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionTransform;
@@ -1361,7 +1362,7 @@ alter_tbl_stmt ::=
   :}
   | KW_ALTER KW_TABLE table_name:table KW_EXECUTE function_call_expr:expr
   {:
-    RESULT = new AlterTableExecuteStmt(table, expr);
+    RESULT = AlterTableExecuteStmt.createExecuteStmt(table, expr);
   :}
   ;
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
similarity index 54%
copy from fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
copy to fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
index 141aa386d..16d8a3753 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
@@ -17,10 +17,13 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
 import org.apache.impala.thrift.TAlterTableExecuteParams;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableType;
@@ -28,59 +31,47 @@ import org.apache.impala.util.ExprUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
- * Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
- * tables, supported operations:
- *  - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
- *    calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
- *    TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
- *    should be retained even when all snapshots are selected by expireOlderThan().
+ * Represents an ALTER TABLE <tbl> EXECUTE EXPIRE_SNAPSHOTS(<parameters>) statement on
+ * Iceberg tables, the parameter is (<timestamp>).
  */
-public class AlterTableExecuteStmt extends AlterTableStmt {
-  private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
-
-  private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
+public class AlterTableExecuteExpireSnapshotsStmt extends AlterTableExecuteStmt {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AlterTableExecuteExpireSnapshotsStmt.class);
 
-  // Expression of the function call after EXECUTE keyword. Parsed into an operation and
-  // a value of that operation.
-  private FunctionCallExpr fnCallExpr_;
+  protected final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
 
-  // Value expression from fnCallExpr_.
-  private Expr fnParamValue_;
-
-  // The value after extracted from fnParamValue_ expression.
-  private long olderThanMillis_ = -1;
-
-  protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
-    super(tableName);
-    fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
+  protected AlterTableExecuteExpireSnapshotsStmt(TableName tableName, Expr fnCallExpr) {
+    super(tableName, fnCallExpr);
   }
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
     Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
-    analyzeFunctionCallExpr(analyzer);
+    analyzeFunctionCallExpr(analyzer, USAGE);
     analyzeOlderThan(analyzer);
   }
 
-  private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
-    // fnCallExpr_ analyzed here manually, because it is not an actual function but a
-    // catalog operation.
-    String fnName = fnCallExpr_.getFnName().toString();
-    if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
-      throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
-          "TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
-    }
-    if (fnCallExpr_.getParams().size() != 1) {
-      throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
-    }
-    fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
+  @Override
+  public String toSql(ToSqlOptions options) {
+    return fnCallExpr_.toSql();
   }
 
-  private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
+  @Override
+  public TAlterTableParams toThrift() {
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.EXECUTE);
+    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+    TAlterTableExecuteExpireSnapshotsParams executeExpireSnapshotsParams =
+        new TAlterTableExecuteExpireSnapshotsParams();
+    executeParams.setExpire_snapshots_params(executeExpireSnapshotsParams);
+    executeExpireSnapshotsParams.setOlder_than_millis(olderThanMillis_);
+    params.setSet_execute_params(executeParams);
+    return params;
+  }
+
+  protected void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
     Preconditions.checkNotNull(fnParamValue_);
     fnParamValue_.analyze(analyzer);
     if (!fnParamValue_.isConstant()) {
@@ -90,31 +81,17 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
       fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
     }
     if (!fnParamValue_.getType().isTimestamp()) {
-      throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
-          fnParamValue_.getType() + "': " + fnParamValue_.toSql());
+      throw new AnalysisException(USAGE + " must be a timestamp type but is '"
+          + fnParamValue_.getType() + "': " + fnParamValue_.toSql());
     }
     try {
       olderThanMillis_ =
           ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
-      LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
+      LOG.debug(USAGE + " millis: " + olderThanMillis_);
     } catch (InternalException ie) {
-      throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
-          USAGE + ": " + ie.getMessage(), ie);
+      throw new AnalysisException("Invalid TIMESTAMP expression has been given to "
+              + USAGE + ": " + ie.getMessage(),
+          ie);
     }
   }
-
-  @Override
-  public String toSql(ToSqlOptions options) {
-    return fnCallExpr_.toSql();
-  }
-
-  @Override
-  public TAlterTableParams toThrift() {
-    TAlterTableParams params = super.toThrift();
-    params.setAlter_type(TAlterTableType.EXECUTE);
-    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
-    executeParams.setOlder_than_millis(olderThanMillis_);
-    params.setSet_execute_params(executeParams);
-    return params;
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java
new file mode 100644
index 000000000..ea4da0b67
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.TRollbackType;
+import org.apache.impala.util.ExprUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an ALTER TABLE EXECUTE ROLLBACK(<parameter>) statement.
+ * The parameter can be a snapshot id, or a timestamp.
+ * A rollback to a snapshot id causes a new snapshot to be
+ * created with the same snapshot id, but with a new creation timestamp.
+ * A rollback to a timestamp rolls back to the latest snapshot
+ * that has a creation timestamp that is older than the specified
+ * timestamp.
+ */
+public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt {
+  public static final String USAGE = "EXECUTE ROLLBACK(<expression>):";
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AlterTableExecuteRollbackStmt.class);
+  private long snapshotVersion_;
+  private TRollbackType kind_;
+
+  public AlterTableExecuteRollbackStmt(TableName tableName, FunctionCallExpr fnCallExpr) {
+    super(tableName, fnCallExpr);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    FeTable table = getTargetTable();
+    if (!(table instanceof FeIcebergTable)) {
+      throw new AnalysisException("ALTER TABLE EXECUTE ROLLBACK is only supported "
+          + "for Iceberg tables: " + table.getTableName());
+    }
+    analyzeFunctionCallExpr(analyzer, USAGE);
+    analyzeParameter(analyzer);
+  }
+
+  private void analyzeParameter(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkNotNull(fnParamValue_);
+    fnParamValue_.analyze(analyzer);
+
+    if (!fnParamValue_.isConstant()) {
+      throw new AnalysisException(
+          USAGE + " <expression> must be a constant expression: EXECUTE " + toSql());
+    }
+    if ((fnParamValue_ instanceof LiteralExpr)
+        && (fnParamValue_.getType().isIntegerType())) {
+      // Parameter is a snapshot id
+      kind_ = TRollbackType.VERSION_ID;
+      snapshotVersion_ = fnParamValue_.evalToInteger(analyzer, USAGE);
+      if (snapshotVersion_ < 0) {
+        throw new AnalysisException("Invalid version number has been given to " + USAGE
+            + ": " + snapshotVersion_);
+      }
+      LOG.debug(USAGE + " version: " + snapshotVersion_);
+    } else {
+      Expr timestampEpr = getParamConvertibleToTimestamp();
+      if (timestampEpr != null) {
+        // Parameter is a timestamp.
+        kind_ = TRollbackType.TIME_ID;
+        try {
+          olderThanMillis_ =
+              ExprUtil.localTimestampToUnixTimeMicros(analyzer, timestampEpr) / 1000;
+          LOG.debug(USAGE + " millis: " + olderThanMillis_);
+        } catch (InternalException ie) {
+          throw new AnalysisException("An invalid TIMESTAMP expression has been given "
+                  + "to " + USAGE + " the expression " + fnParamValue_.toSql()
+                  + " cannot be converted to a TIMESTAMP",
+              ie);
+        }
+      } else {
+        throw new AnalysisException(USAGE
+            + " <expression> must be an integer type or a timestamp, but is '"
+            + fnParamValue_.getType() + "': EXECUTE " + toSql());
+      }
+    }
+  }
+
+  /**
+   * If field fnParamValue_ is a Timestamp, or can be cast to a Timestamp,
+   * then return an Expr for the Timestamp.
+   * @return null if the fnParamValue_ cannot be converted to a Timestamp.
+   */
+  private Expr getParamConvertibleToTimestamp() {
+    Expr timestampExpr = fnParamValue_;
+    if (timestampExpr.getType().isStringType()) {
+      timestampExpr = new CastExpr(Type.TIMESTAMP, fnParamValue_);
+    }
+    if (timestampExpr.getType().isTimestamp()) {
+      return timestampExpr;
+    }
+    return null;
+  }
+
+  @Override
+  public String toSql(ToSqlOptions options) {
+    return fnCallExpr_.toSql();
+  }
+
+  @Override
+  public TAlterTableParams toThrift() {
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.EXECUTE);
+    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+    TAlterTableExecuteRollbackParams executeRollbackParams =
+        new TAlterTableExecuteRollbackParams();
+    executeParams.setExecute_rollback_params(executeRollbackParams);
+    executeRollbackParams.setKind(kind_);
+    switch (kind_) {
+      case TIME_ID: executeRollbackParams.setTimestamp_millis(olderThanMillis_); break;
+      case VERSION_ID: executeRollbackParams.setSnapshot_id(snapshotVersion_); break;
+      default: throw new IllegalStateException("Bad kind of execute rollback " + kind_);
+    }
+    params.setSet_execute_params(executeParams);
+    return params;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
index 141aa386d..0a0e33206 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
@@ -17,104 +17,63 @@
 
 package org.apache.impala.analysis;
 
-import org.apache.impala.catalog.FeIcebergTable;
-import org.apache.impala.catalog.Type;
-import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.InternalException;
-import org.apache.impala.thrift.TAlterTableExecuteParams;
-import org.apache.impala.thrift.TAlterTableParams;
-import org.apache.impala.thrift.TAlterTableType;
-import org.apache.impala.util.ExprUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Preconditions;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.impala.common.AnalysisException;
+
 /**
  * Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
- * tables, supported operations:
- *  - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
- *    calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
- *    TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
- *    should be retained even when all snapshots are selected by expireOlderThan().
+ * tables. For supported operations see the subclasses.
  */
 public class AlterTableExecuteStmt extends AlterTableStmt {
-  private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
-
-  private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
 
   // Expression of the function call after EXECUTE keyword. Parsed into an operation and
   // a value of that operation.
-  private FunctionCallExpr fnCallExpr_;
-
+  protected FunctionCallExpr fnCallExpr_;
   // Value expression from fnCallExpr_.
-  private Expr fnParamValue_;
-
+  protected Expr fnParamValue_;
   // The value after extracted from fnParamValue_ expression.
-  private long olderThanMillis_ = -1;
+  protected long olderThanMillis_ = -1;
 
   protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
     super(tableName);
-    fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
+    fnCallExpr_ = (FunctionCallExpr) fnCallExpr;
   }
 
-  @Override
-  public void analyze(Analyzer analyzer) throws AnalysisException {
-    super.analyze(analyzer);
-    Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
-    analyzeFunctionCallExpr(analyzer);
-    analyzeOlderThan(analyzer);
+  /**
+   * Return an instance of a subclass of AlterTableExecuteStmt that can analyze the
+   * execute statement for the function call expression in 'expr'.
+   */
+  public static AlterTableStmt createExecuteStmt(TableName tableName, Expr expr)
+      throws AnalysisException {
+    FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr;
+    String functionNameOrig = fnCallExpr.getFnName().toString();
+    String functionName = functionNameOrig.toUpperCase();
+    switch (functionName) {
+      case "EXPIRE_SNAPSHOTS":
+        return new AlterTableExecuteExpireSnapshotsStmt(tableName, fnCallExpr);
+      case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr);
+      default:
+        throw new AnalysisException(String.format("'%s' is not supported by ALTER "
+                + "TABLE <table> EXECUTE. Supported operations are: "
+                + "EXPIRE_SNAPSHOTS(<expression>), "
+                + "ROLLBACK(<expression>).",
+            functionNameOrig));
+    }
   }
 
-  private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
+  protected void analyzeFunctionCallExpr(Analyzer ignoredAnalyzer, String usage)
+      throws AnalysisException {
     // fnCallExpr_ analyzed here manually, because it is not an actual function but a
     // catalog operation.
     String fnName = fnCallExpr_.getFnName().toString();
-    if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
-      throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
-          "TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
-    }
+    Preconditions.checkState(
+        StringUtils.equalsAnyIgnoreCase(fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK"));
     if (fnCallExpr_.getParams().size() != 1) {
-      throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
+      throw new AnalysisException(usage + " must have one parameter: " + toSql());
     }
     fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
   }
 
-  private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
-    Preconditions.checkNotNull(fnParamValue_);
-    fnParamValue_.analyze(analyzer);
-    if (!fnParamValue_.isConstant()) {
-      throw new AnalysisException(USAGE + " must be a constant expression: " + toSql());
-    }
-    if (fnParamValue_.getType().isStringType()) {
-      fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
-    }
-    if (!fnParamValue_.getType().isTimestamp()) {
-      throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
-          fnParamValue_.getType() + "': " + fnParamValue_.toSql());
-    }
-    try {
-      olderThanMillis_ =
-          ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
-      LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
-    } catch (InternalException ie) {
-      throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
-          USAGE + ": " + ie.getMessage(), ie);
-    }
-  }
-
-  @Override
-  public String toSql(ToSqlOptions options) {
-    return fnCallExpr_.toSql();
-  }
-
-  @Override
-  public TAlterTableParams toThrift() {
-    TAlterTableParams params = super.toThrift();
-    params.setAlter_type(TAlterTableType.EXECUTE);
-    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
-    executeParams.setOlder_than_millis(olderThanMillis_);
-    params.setSet_execute_params(executeParams);
-    return params;
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 28a82ac03..e2fa4e713 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -178,7 +178,7 @@ public interface FeFsTable extends FeTable {
   Map<Long, ? extends PrunablePartition> getPartitionMap();
 
   /**
-   * @param the index of the target partitioning column
+   * @param col the index of the target partitioning column
    * @return a map from value to a set of partitions for which column 'col'
    * has that value.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3830ffa7e..6e2a3f478 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -165,6 +165,7 @@ import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableAlterColParams;
 import org.apache.impala.thrift.TAlterTableDropColParams;
 import org.apache.impala.thrift.TAlterTableDropPartitionParams;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
 import org.apache.impala.thrift.TAlterTableOrViewSetOwnerParams;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableReplaceColsParams;
@@ -1367,9 +1368,23 @@ public class CatalogOpExecutor {
           break;
         case EXECUTE:
           Preconditions.checkState(params.isSetSet_execute_params());
-          String summary = IcebergCatalogOpExecutor.alterTableExecute(iceTxn,
-              params.getSet_execute_params());
-          addSummary(response, summary);
+          // All the EXECUTE functions operate only on Iceberg data.
+          needsToUpdateHms = false;
+          TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params();
+          if (setExecuteParams.isSetExecute_rollback_params()) {
+            String rollbackSummary = IcebergCatalogOpExecutor.alterTableExecuteRollback(
+                iceTxn, tbl, setExecuteParams.getExecute_rollback_params());
+            addSummary(response, rollbackSummary);
+          } else if (setExecuteParams.isSetExpire_snapshots_params()) {
+            String expireSummary =
+                IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(
+                    iceTxn, setExecuteParams.getExpire_snapshots_params());
+            addSummary(response, expireSummary);
+          } else {
+            // Cannot happen, but throw just in case.
+            throw new IllegalStateException(
+                "Alter table execute statement is not implemented.");
+          }
           break;
         case SET_PARTITION_SPEC:
           // Set partition spec uses 'TableOperations', not transactions.
@@ -1393,7 +1408,7 @@ public class CatalogOpExecutor {
           break;
         case REPLACE_COLUMNS:
           // It doesn't make sense to replace all the columns of an Iceberg table as it
-          // would basically make all existing data unaccessible.
+          // would basically make all existing data inaccessible.
         default:
           throw new UnsupportedOperationException(
               "Unsupported ALTER TABLE operation for Iceberg tables: " +
@@ -1414,7 +1429,7 @@ public class CatalogOpExecutor {
 
     if (!needsToUpdateHms) {
       // We don't need to update HMS because either it is already done by Iceberg's
-      // HiveCatalog, or we modified the PARTITION SPEC which is not stored in HMS.
+      // HiveCatalog, or we modified the Iceberg data which is not stored in HMS.
       loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " +
           params.getAlter_type().name());
       catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 75aa6e9a0..10d19f7c5 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -23,16 +23,17 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.BaseReplacePartitions;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.ManageSnapshots;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotManager;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -42,32 +43,36 @@ import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.TableNotFoundException;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergColumnStats;
 import org.apache.impala.fb.FbIcebergDataFile;
-import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
+import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergOperationParam;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
+import org.apache.impala.thrift.TRollbackType;
 import org.apache.impala.util.IcebergSchemaConverter;
 import org.apache.impala.util.IcebergUtil;
-import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a helper for the CatalogOpExecutor to provide Iceberg related DDL functionality
  * such as creating and dropping tables from Iceberg.
  */
 public class IcebergCatalogOpExecutor {
-  public static final Logger LOG = Logger.getLogger(IcebergCatalogOpExecutor.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(IcebergCatalogOpExecutor.class);
 
   /**
    * Create Iceberg table by Iceberg api
@@ -177,14 +182,49 @@ public class IcebergCatalogOpExecutor {
     tableOp.commit(metadata, newMetadata);
   }
 
-  public static String alterTableExecute(Transaction txn,
-      TAlterTableExecuteParams params) {
+  /**
+   * Use the ExpireSnapshot API to expire snapshots by calling the
+   * ExpireSnapshot.expireOlderThan(timestampMillis) method.
+   * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
+   * should be retained even when all snapshots are selected by expireOlderThan().
+   */
+  public static String alterTableExecuteExpireSnapshots(
+      Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) {
     ExpireSnapshots expireApi = txn.expireSnapshots();
+    Preconditions.checkState(params.isSetOlder_than_millis());
     expireApi.expireOlderThan(params.older_than_millis);
     expireApi.commit();
     return "Snapshots have been expired.";
   }
 
+  /**
+   * Executes an ALTER TABLE EXECUTE ROLLBACK.
+   */
+  public static String alterTableExecuteRollback(
+      Transaction iceTxn, FeIcebergTable tbl, TAlterTableExecuteRollbackParams params) {
+    TRollbackType kind = params.getKind();
+    ManageSnapshots manageSnapshots = iceTxn.manageSnapshots();
+    switch (kind) {
+      case TIME_ID:
+        Preconditions.checkState(params.isSetTimestamp_millis());
+        long timestampMillis = params.getTimestamp_millis();
+        LOG.info("Rollback iceberg table to snapshot before timestamp {}",
+            timestampMillis);
+        manageSnapshots.rollbackToTime(timestampMillis);
+        break;
+      case VERSION_ID:
+        Preconditions.checkState(params.isSetSnapshot_id());
+        long snapshotId = params.getSnapshot_id();
+        LOG.info("Rollback iceberg table to snapshot id {}", snapshotId);
+        manageSnapshots.rollbackTo(snapshotId);
+        break;
+      default: throw new IllegalStateException("Bad kind of execute rollback " + kind);
+    }
+    // Commit the update.
+    manageSnapshots.commit();
+    return "Rollback executed.";
+  }
+
   /**
    * Drops a column from a Iceberg table.
    */
@@ -370,4 +410,4 @@ public class IcebergCatalogOpExecutor {
                     String.valueOf(version));
     updateProps.commit();
   }
-}
+}
\ No newline at end of file
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 6a96ef100..418334be1 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -408,7 +408,8 @@ import org.apache.impala.thrift.TReservedWordsVersion;
         "year", "month", "day", "hour", "minute", "second",
         "begin", "call", "check", "classifier", "close", "identity", "language",
         "localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
-        "period", "result", "return", "sql", "start", "system", "time", "user", "value"
+        "period", "result", "return", "rollback", "sql", "start", "system", "time",
+        "user", "value"
     }));
   }
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 5986fb1c8..c4f656d3b 100755
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -4281,8 +4281,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Negative tests
     AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
         "unsupported_operation(123456789);", "'unsupported_operation' is not supported " +
-        "by ALTER TABLE <table> EXECUTE. Supported operation is " +
-        "EXPIRE_SNAPSHOTS(<expression>)");
+        "by ALTER TABLE <table> EXECUTE. Supported operations are: " +
+        "EXPIRE_SNAPSHOTS(<expression>), ROLLBACK(<expression>)");
     AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
         "expire_snapshots(now(), 3);", "EXPIRE_SNAPSHOTS(<expression>) must have one " +
         "parameter: expire_snapshots(now(), 3)");
@@ -4297,6 +4297,49 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         " been given to EXPIRE_SNAPSHOTS(<expression>)");
   }
 
+  @Test
+  public void TestAlterExecuteRollback() {
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('2022-01-04 10:00:00');");
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(123456);");
+    // Timestamp can be an expression.
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(cast('2021-08-09 15:52:45' as timestamp) - interval 2 days + " +
+        "interval 3 hours);");
+
+    // Negative tests
+    AnalysisError("alter table nodb.alltypes execute " +
+        "rollback('2022-01-04 10:00:00');",
+       "Could not resolve table reference: 'nodb.alltypes'");
+    AnalysisError("alter table functional.alltypes execute " +
+        "rollback('2022-01-04 10:00:00');",
+       "ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables: " +
+           "functional.alltypes");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(id);", "EXECUTE ROLLBACK(<expression>): " +
+        "<expression> must be a constant expression: EXECUTE rollback(id)");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(3.14);", "EXECUTE ROLLBACK(<expression>): <expression> " +
+        "must be an integer type or a timestamp, but is 'DECIMAL(3,2)': " +
+        "EXECUTE rollback(3.14)");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('2021-02-32 15:52:45');", "An invalid TIMESTAMP expression has been " +
+        "given to EXECUTE ROLLBACK(<expression>): the expression " +
+        "'2021-02-32 15:52:45' cannot be converted to a TIMESTAMP");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('the beginning');", "An invalid TIMESTAMP expression has been " +
+        "given to EXECUTE ROLLBACK(<expression>): the expression " +
+        "'the beginning' cannot be converted to a TIMESTAMP");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback(1111,2222);",
+        "EXECUTE ROLLBACK(<expression>): must have one parameter");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "rollback('1111');", "An invalid TIMESTAMP expression has been " +
+        "given to EXECUTE ROLLBACK(<expression>): the expression " +
+        "'1111' cannot be converted to a TIMESTAMP");
+  }
+
   private static String buildLongOwnerName() {
     StringBuilder comment = new StringBuilder();
     for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test
new file mode 100644
index 000000000..4c2c3cd18
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test
@@ -0,0 +1,27 @@
+====
+---- QUERY
+# EXECUTE ROLLBACK with an invalid snapshot id on a partitioned Iceberg table.
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK(1)
+---- CATCH
+Cannot roll back to unknown snapshot id: 1
+====
+---- QUERY
+# EXECUTE ROLLBACK to a too old date on a partitioned Iceberg table.
+set timezone=CET;
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('2020-08-31 07:58:00')
+---- CATCH
+Cannot roll back, no valid snapshot older than: 1598853480000
+====
+---- QUERY
+# EXECUTE ROLLBACK to an Invalid timestamp expression on a partitioned Iceberg table.
+set timezone=CET;
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('1111');
+---- CATCH
+An invalid TIMESTAMP expression has been given to EXECUTE ROLLBACK(<expression>): the expression '1111' cannot be converted to a TIMESTAMP
+====
+---- QUERY
+# EXECUTE ROLLBACK fails on a non-Iceberg table.
+ALTER TABLE functional_parquet.alltypestiny EXECUTE ROLLBACK(1111111)
+---- CATCH
+ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables
+====
\ No newline at end of file
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index dc7ac51c9..0a53c31f7 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -28,6 +28,7 @@ from tests.common.skip import SkipIfFS
 from tests.util.hive_utils import HiveDbWrapper
 from tests.util.event_processor_utils import EventProcessorUtils
 from tests.util.filesystem_utils import WAREHOUSE
+from tests.util.iceberg_util import IcebergCatalogs
 
 
 @SkipIfFS.hive
@@ -482,6 +483,7 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
   def test_iceberg_self_events(self, unique_database):
     """This test checks that Impala doesn't refresh Iceberg tables on self events."""
     tbl_name = unique_database + ".test_iceberg_events"
+    iceberg_catalogs = IcebergCatalogs(unique_database)
 
     def check_self_events(query, skips_events=True):
       tbls_refreshed_before, partitions_refreshed_before, \
@@ -495,19 +497,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
       if skips_events:
         assert events_skipped_after > events_skipped_before
 
-    hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
-    hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " +
-        "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
-            unique_database))
-    hive_catalog = "'iceberg.catalog'='hive.catalog'"
-    hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
-    hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
-
-    all_catalogs = [hadoop_tables, hadoop_catalog, hive_catalog, hive_catalogs,
-        hadoop_catalogs]
-
-    for catalog in all_catalogs:
-      is_hive_catalog = catalog == hive_catalog or catalog == hive_catalogs
+    for catalog in iceberg_catalogs.get_iceberg_catalog_properties():
+      is_hive_catalog = iceberg_catalogs.is_a_hive_catalog(catalog)
       self.client.execute("""
           CREATE TABLE {0} (i int) STORED AS ICEBERG
           TBLPROPERTIES ({1})""".format(tbl_name, catalog))
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 6f64ced0e..a8e05baf5 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -22,6 +22,8 @@ import logging
 import os
 import pytest
 import random
+
+import re
 import time
 
 from subprocess import check_call
@@ -33,14 +35,14 @@ import json
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.iceberg_test_suite import IcebergTestSuite
-from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfLocal
+from tests.common.skip import SkipIf, SkipIfDockerizedCluster
 from tests.common.file_utils import (
   create_iceberg_table_from_directory,
   create_table_from_parquet)
 from tests.shell.util import run_impala_shell_cmd
 from tests.util.filesystem_utils import get_fs_path, IS_HDFS
 from tests.util.get_parquet_metadata import get_parquet_metadata
-from tests.util.iceberg_util import cast_ts, quote, parse_timestamp
+from tests.util.iceberg_util import cast_ts, quote, get_snapshots, IcebergCatalogs
 
 LOG = logging.getLogger(__name__)
 
@@ -69,58 +71,61 @@ class TestIcebergTable(IcebergTestSuite):
 
   def test_expire_snapshots(self, unique_database):
     tbl_name = unique_database + ".expire_snapshots"
-
-    # We are setting the TIMEZONE query option in this test, so let's create a local
-    # impala client.
-    with self.create_impala_client() as impalad_client:
-      # Iceberg doesn't create a snapshot entry for the initial empty table
-      impalad_client.execute("create table {0} (i int) stored as iceberg"
-          .format(tbl_name))
-      ts_0 = datetime.datetime.now()
-      insert_q = "insert into {0} values (1)".format(tbl_name)
-      ts_1 = self.execute_query_ts(impalad_client, insert_q)
-      time.sleep(5)
-      impalad_client.execute(insert_q)
-      time.sleep(5)
-      ts_2 = self.execute_query_ts(impalad_client, insert_q)
-      impalad_client.execute(insert_q)
-
-      # There should be 4 snapshots initially
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
-      # Expire the oldest snapshot and test that the oldest one was expired
-      expire_q = "alter table {0} execute expire_snapshots({1})"
-      impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
-
-      # Expire with a timestamp in which the interval does not touch existing snapshot
-      impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
-
-      # Expire all, but retain 1
-      impalad_client.execute(expire_q.format(tbl_name,
-          cast_ts(datetime.datetime.now())))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
-
-      # Change number of retained snapshots, then expire all
-      impalad_client.execute("""alter table {0} set tblproperties
-          ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
-      impalad_client.execute(insert_q)
-      impalad_client.execute(insert_q)
-      impalad_client.execute(expire_q.format(tbl_name,
-          cast_ts(datetime.datetime.now())))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
-
-      # Check that timezone is interpreted in local timezone controlled by query option
-      # TIMEZONE.
-      impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
-      impalad_client.execute(insert_q)
-      ts_tokyo = self.impala_now(impalad_client)
-      impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
-      impalad_client.execute(insert_q)
-      impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
-      impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
-      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
+    iceberg_catalogs = IcebergCatalogs(unique_database)
+    for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
+      # We are setting the TIMEZONE query option in this test, so let's create a local
+      # impala client.
+      with self.create_impala_client() as impalad_client:
+        # Iceberg doesn't create a snapshot entry for the initial empty table
+        impalad_client.execute("""
+          create table {0} (i int) stored as iceberg
+          TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
+        ts_0 = datetime.datetime.now()
+        insert_q = "insert into {0} values (1)".format(tbl_name)
+        ts_1 = self.execute_query_ts(impalad_client, insert_q)
+        time.sleep(5)
+        impalad_client.execute(insert_q)
+        time.sleep(5)
+        ts_2 = self.execute_query_ts(impalad_client, insert_q)
+        impalad_client.execute(insert_q)
+
+        # There should be 4 snapshots initially
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
+        # Expire the oldest snapshot and test that the oldest one was expired
+        expire_q = "alter table {0} execute expire_snapshots({1})"
+        impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
+
+        # Expire with a timestamp in which the interval does not touch existing snapshot
+        impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+
+        # Expire all, but retain 1
+        impalad_client.execute(expire_q.format(tbl_name,
+            cast_ts(datetime.datetime.now())))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
+
+        # Change number of retained snapshots, then expire all
+        impalad_client.execute("""alter table {0} set tblproperties
+            ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
+        impalad_client.execute(insert_q)
+        impalad_client.execute(insert_q)
+        impalad_client.execute(expire_q.format(tbl_name,
+            cast_ts(datetime.datetime.now())))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
+
+        # Check that timezone is interpreted in local timezone controlled by query option
+        # TIMEZONE.
+        impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+        impalad_client.execute(insert_q)
+        ts_tokyo = self.impala_now(impalad_client)
+        impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
+        impalad_client.execute(insert_q)
+        impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+        impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
+        self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
+        impalad_client.execute("DROP TABLE {0}".format(tbl_name))
 
   def test_truncate_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
@@ -128,7 +133,7 @@ class TestIcebergTable(IcebergTestSuite):
   # With IMPALA-11429 there is an extra "ALTER TABLE SET OWNER" right after executing
   # "CREATE TABLE". As a result dropping the table location right after CREATE TABLE will
   # trigger a known bug: IMPALA-11509. Hence, turning this test off until there is a fix
-  # for this issue. Note, we could add a sleep righ after table creation that could
+  # for this issue. Note, we could add a sleep right after table creation that could
   # workaround the above mentioned bug but then we would hit another issue: IMPALA-11502.
   @SkipIf.not_dfs
   def test_drop_incomplete_table(self, vector, unique_database):
@@ -231,31 +236,127 @@ class TestIcebergTable(IcebergTestSuite):
         tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name))
     self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name))
     self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name))
-    result = self.client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
-    assert(len(result.data) == 2)
-    first_snapshot = result.data[0].split("\t")
-    second_snapshot = result.data[1].split("\t")
+    snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2)
+    first_snapshot = snapshots[0]
+    second_snapshot = snapshots[1]
     # Check that first snapshot is older than the second snapshot.
-    assert(first_snapshot[0] < second_snapshot[0])
+    assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time())
     # Check that second snapshot's parent ID is the snapshot ID of the first snapshot.
-    assert(first_snapshot[1] == second_snapshot[2])
+    assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id())
     # The first snapshot has no parent snapshot ID.
-    assert(first_snapshot[2] == "NULL")
+    assert(first_snapshot.get_parent_id() is None)
     # Check "is_current_ancestor" column.
-    assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE")
+    assert(first_snapshot.is_current_ancestor())
+    assert(second_snapshot.is_current_ancestor())
+
+  def test_execute_rollback_negative(self, vector):
+    """Negative test for EXECUTE ROLLBACK."""
+    self.run_test_case('QueryTest/iceberg-rollback-negative', vector)
+
+  def test_execute_rollback(self, unique_database):
+    """Test for EXECUTE ROLLBACK."""
+    iceberg_catalogs = IcebergCatalogs(unique_database)
+    for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
+      # Create a table with multiple snapshots.
+      tbl_name = unique_database + ".iceberg_execute_rollback"
+      # We are setting the TIMEZONE query option in this test, so let's create a local
+      # impala client.
+      with self.create_impala_client() as impalad_client:
+        impalad_client.execute("""
+          create table {0} (i int) stored as iceberg
+          TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
+        initial_snapshots = 3
+        for i in range(initial_snapshots):
+          impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i))
+        snapshots = get_snapshots(impalad_client, tbl_name,
+          expected_result_size=initial_snapshots)
+
+        output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id())
+        LOG.info("success output={0}".format(output))
+
+        # We rolled back, but that creates a new snapshot, so now there are 4.
+        snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4)
+        # The new snapshot has the same id (and parent id) as the snapshot we rolled back
+        # to, but it has a different creation time.
+        assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id()
+        assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id()
+        assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time()
+        # The "orphaned" snapshot is now not a current ancestor.
+        assert not snapshots[2].is_current_ancestor()
+
+        # We cannot roll back to a snapshot that is not a current ancestor.
+        output = self.rollback_to_id_expect_failure(tbl_name,
+            snapshots[2].get_snapshot_id(),
+            expected_text="Cannot roll back to snapshot, not an ancestor of the current "
+                          "state")
+
+        # Create another snapshot.
+        before_insert = datetime.datetime.now()
+        impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4))
+        snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
+
+        # Rollback to before the last insert.
+        self.rollback_to_ts(impalad_client, tbl_name, before_insert)
+        # This creates another snapshot.
+        snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6)
+        # The snapshot id is the same, the dates differ
+        assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id()
+        assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time()
+        assert not snapshots[4].is_current_ancestor()
+
+        # Show that the EXECUTE ROLLBACK is respecting the current timezone.
+        # To do this we try to roll back to a time for which there is no
+        # snapshot, this will fail with an error message that includes the specified
+        # time. We parse out that time. By doing this in two timezones we can see
+        # that the parameter being used was affected by the current timezone.
+        one_hour_ago = before_insert - datetime.timedelta(hours=1)
+        # We use Timezones from Japan and Iceland to avoid any DST complexities.
+        impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+        japan_ts = self.get_snapshot_ts_from_failed_rollback(
+            impalad_client, tbl_name, one_hour_ago)
+        impalad_client.execute("SET TIMEZONE='Iceland'")
+        iceland_ts = self.get_snapshot_ts_from_failed_rollback(
+            impalad_client, tbl_name, one_hour_ago)
+        diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60)
+        assert diff_hours == 9
+
+        impalad_client.execute("DROP TABLE {0}".format(tbl_name))
+
+  def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts):
+    """Run an EXECUTE ROLLBACK which is expected to fail.
+    Parse the error message to extract the timestamp for which there
+    was no snapshot, and convert the string to an integer"""
+    try:
+      self.rollback_to_ts(client, tbl_name, ts)
+      assert False, "Query should have failed"
+    except ImpalaBeeswaxException as e:
+      result = re.search(r".*no valid snapshot older than: (\d+)", str(e))
+      time_str = result.group(1)
+      snapshot_ts = int(time_str)
+      assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result)
+      return snapshot_ts
+
+  def rollback_to_ts(self, client, tbl_name, ts):
+    """Rollback a table to a snapshot timestamp."""
+    query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat())
+    return self.execute_query_expect_success(client, query)
+
+  def rollback_to_id(self, tbl_name, id):
+    """Rollback a table to a snapshot id."""
+    query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
+    return self.execute_query_expect_success(self.client, query)
+
+  def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None):
+    """Attempt to roll back a table to a snapshot id, expecting a failure."""
+    query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
+    output = self.execute_query_expect_failure(self.client, query)
+    if expected_text:
+      assert expected_text in str(output)
+    return output
 
   def test_describe_history_params(self, unique_database):
     tbl_name = unique_database + ".describe_history"
 
-    def expect_results_between(ts_start, ts_end, expected_result_size):
-      query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format(
-        tbl_name, cast_ts(ts_start), cast_ts(ts_end))
-      data = impalad_client.execute(query)
-      assert len(data.data) == expected_result_size
-      for i in range(len(data.data)):
-        result_ts_dt = parse_timestamp(data.data[i].split('\t')[0])
-        assert result_ts_dt >= ts_start and result_ts_dt <= ts_end
-
     # We are setting the TIMEZONE query option in this test, so let's create a local
     # impala client.
     with self.create_impala_client() as impalad_client:
@@ -279,10 +380,11 @@ class TestIcebergTable(IcebergTestSuite):
       self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0)
 
       # Describe history with BETWEEN <ts> AND <ts> predicate
-      expect_results_between(ts_1, ts_2, 1)
-      expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2, 2)
-      expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2 +
-          datetime.timedelta(hours=1), 3)
+      self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1)
+      self.expect_results_between(impalad_client, tbl_name,
+          ts_1 - datetime.timedelta(hours=1), ts_2, 2)
+      self.expect_results_between(impalad_client, tbl_name,
+          ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3)
 
       # Check that timezone is interpreted in local timezone controlled by query option
       # TIMEZONE. Persist the local times first and create a new snapshot.
@@ -341,14 +443,6 @@ class TestIcebergTable(IcebergTestSuite):
               tbl_name, snapshot_id),
           expected)
 
-    def get_snapshots():
-      data = impalad_client.execute("describe history {0}".format(tbl_name))
-      ret = list()
-      for row in data.data:
-        fields = row.split('\t')
-        ret.append(fields[1])
-      return ret
-
     def impala_now():
       now_data = impalad_client.execute("select now()")
       return now_data.data[0]
@@ -409,12 +503,12 @@ class TestIcebergTable(IcebergTestSuite):
                        ij_cols)
 
       # Query table as of snapshot IDs.
-      snapshots = get_snapshots()
-      expect_results_v(snapshots[0], ['1'], i_cols)
-      expect_results_v(snapshots[1], ['1', '2'], i_cols)
-      expect_results_v(snapshots[2], [], i_cols)
-      expect_results_v(snapshots[3], ['100'], i_cols)
-      expect_results_v(snapshots[4], ['100\tNULL', '3\t103'], ij_cols)
+      snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
+      expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols)
+      expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols)
+      expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols)
+      expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols)
+      expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols)
 
       # Test of plain count star optimization
       # 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile
@@ -427,11 +521,11 @@ class TestIcebergTable(IcebergTestSuite):
       expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0')
       expect_for_count_star_t(cast_ts(ts_5), '2')
       expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2')
-      expect_for_count_star_v(snapshots[0], '1')
-      expect_for_count_star_v(snapshots[1], '2')
-      expect_for_count_star_v(snapshots[2], '0')
-      expect_for_count_star_v(snapshots[3], '1')
-      expect_for_count_star_v(snapshots[4], '2')
+      expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1')
+      expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2')
+      expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0')
+      expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1')
+      expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2')
 
       # SELECT diff
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
@@ -442,18 +536,19 @@ class TestIcebergTable(IcebergTestSuite):
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
                         MINUS
                         SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
-                     tbl=tbl_name, v_new=snapshots[1], v_old=snapshots[0]),
+                     tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(),
+                     v_old=snapshots[0].get_snapshot_id()),
                      ['2'], i_cols)
       # Mix SYSTEM_TIME and SYSTEM_VERSION
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
                         MINUS
                         SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
-                     tbl=tbl_name, v_new=snapshots[1], ts_old=ts_1),
+                     tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1),
                      ['2'], i_cols)
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
                         MINUS
                         SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
-                     tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0]),
+                     tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()),
                      ['2'], i_cols)
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
                         MINUS
diff --git a/tests/util/iceberg_util.py b/tests/util/iceberg_util.py
index 924b5e249..21703acfd 100644
--- a/tests/util/iceberg_util.py
+++ b/tests/util/iceberg_util.py
@@ -106,3 +106,34 @@ def get_snapshots(impalad_client, tbl_name, ts_start=None, ts_end=None,
   for snapshot_str in rows.data:
     results.append(Snapshot(snapshot_str))
   return results
+
+
+class IcebergCatalogs:
+  """Utility class to generate TBLPROPERTIES corresponding to various iceberg catalogs."""
+
+  def __init__(self, database):
+    """Create a IcebergCatalogs object parameterized by database name."""
+    self.database = database
+
+  hive_catalog = "'iceberg.catalog'='hive.catalog'"
+  hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
+  hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
+  hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
+
+  def get_iceberg_catalog_properties(self):
+    """Return a list containing TBLPROPERTIES corresponding to various iceberg catalogs.
+     The TBLPROPERTIES can be used to create tables."""
+    hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', "
+        + "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
+          self.database))
+    return [
+      self.hadoop_tables,
+      self.hive_catalog,
+      self.hive_catalogs,
+      self.hadoop_catalogs,
+      hadoop_catalog
+    ]
+
+  def is_a_hive_catalog(self, catalog):
+    """Return true if the catalog property is for a Hive catalog."""
+    return catalog == self.hive_catalog or catalog == self.hive_catalogs