You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2023/03/10 16:35:01 UTC
[impala] 01/02: IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.
This is an automated email from the ASF dual-hosted git repository.
asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 29586d6631d97acf13f208564db7c8c072154019
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Tue Sep 13 18:29:57 2022 -0700
IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.
Iceberg table modifications cause new table snapshots to be created;
these snapshots represent an earlier version of the table. The Iceberg
API provides a way to rollback the table to a previous snapshot.
This change adds the ability to execute a rollback on Iceberg tables
using the following statements:
- ALTER TABLE <tbl> EXECUTE ROLLBACK(<snapshot id>)
- ALTER TABLE <tbl> EXECUTE ROLLBACK('<timestamp>')
The latter form of the command rolls back to the most recent snapshot
that has a creation timestamp that is older than the specified
timestamp.
Note that when a table is rolled back to a snapshot, a new snapshot is
created with the same snapshot id, but with a new creation timestamp.
Testing:
- Added analysis unit tests.
- Added e2e tests.
- Converted test_time_travel to use get_snapshots() from iceberg_util.
- Add a utility class to allow pytests to create tables with various
iceberg catalogs.
Change-Id: Ic74913d3b81103949ffb5eef7cc936303494f8b9
Reviewed-on: http://gerrit.cloudera.org:8080/19002
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
common/thrift/JniCatalog.thrift | 34 ++-
fe/src/main/cup/sql-parser.cup | 3 +-
...a => AlterTableExecuteExpireSnapshotsStmt.java} | 93 +++----
.../analysis/AlterTableExecuteRollbackStmt.java | 147 +++++++++++
.../impala/analysis/AlterTableExecuteStmt.java | 107 +++-----
.../java/org/apache/impala/catalog/FeFsTable.java | 2 +-
.../apache/impala/service/CatalogOpExecutor.java | 25 +-
.../impala/service/IcebergCatalogOpExecutor.java | 56 +++-
fe/src/main/jflex/sql-scanner.flex | 3 +-
.../org/apache/impala/analysis/AnalyzeDDLTest.java | 47 +++-
.../QueryTest/iceberg-rollback-negative.test | 27 ++
tests/custom_cluster/test_events_custom_configs.py | 17 +-
tests/query_test/test_iceberg.py | 291 ++++++++++++++-------
tests/util/iceberg_util.py | 31 +++
14 files changed, 618 insertions(+), 265 deletions(-)
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index c41762a6f..56398eaf0 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -414,12 +414,38 @@ struct TAlterTableSetPartitionSpecParams {
1: required CatalogObjects.TIcebergPartitionSpec partition_spec
}
-// Parameters for ALTER TABLE EXECUTE operations.
-struct TAlterTableExecuteParams {
- // The parameter of the ExpireSnapshot.expireOlderThan(timestampMillis) Iceberg call.
+// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS operations.
+struct TAlterTableExecuteExpireSnapshotsParams {
1: required i64 older_than_millis
}
+// ALTER TABLE EXECUTE ROLLBACK can be to a date or snapshot id.
+enum TRollbackType {
+ TIME_ID = 0
+ VERSION_ID = 1
+}
+
+// Parameters for ALTER TABLE EXECUTE ROLLBACK operations.
+struct TAlterTableExecuteRollbackParams {
+ // Is rollback to a date or snapshot id.
+ 1: required TRollbackType kind
+
+ // If kind is TIME_ID this is the date to rollback to.
+ 2: optional i64 timestamp_millis
+
+ // If kind is VERSION_ID this is the id to rollback to.
+ 3: optional i64 snapshot_id
+}
+
+// Parameters for ALTER TABLE EXECUTE ... operations.
+struct TAlterTableExecuteParams {
+ // Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
+ 1: optional TAlterTableExecuteExpireSnapshotsParams expire_snapshots_params
+
+ // Parameters for ALTER TABLE EXECUTE ROLLBACK
+ 2: optional TAlterTableExecuteRollbackParams execute_rollback_params
+}
+
// Parameters for all ALTER TABLE commands.
struct TAlterTableParams {
1: required TAlterTableType alter_type
@@ -478,7 +504,7 @@ struct TAlterTableParams {
// Parameters for ALTER TABLE SET PARTITION SPEC
19: optional TAlterTableSetPartitionSpecParams set_partition_spec_params
- // Parameters for ALTER TABLE EXECUTE
+ // Parameters for ALTER TABLE EXECUTE operations
20: optional TAlterTableExecuteParams set_execute_params
}
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index dcf8eba5c..b136a4631 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -37,6 +37,7 @@ import org.apache.impala.analysis.SetOperationStmt.SetOperator;
import org.apache.impala.analysis.RangePartition;
import org.apache.impala.analysis.TableSampleClause;
import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation;
+import org.apache.impala.analysis.AlterTableExecuteStmt;
import org.apache.impala.analysis.IcebergPartitionSpec;
import org.apache.impala.analysis.IcebergPartitionField;
import org.apache.impala.analysis.IcebergPartitionTransform;
@@ -1361,7 +1362,7 @@ alter_tbl_stmt ::=
:}
| KW_ALTER KW_TABLE table_name:table KW_EXECUTE function_call_expr:expr
{:
- RESULT = new AlterTableExecuteStmt(table, expr);
+ RESULT = AlterTableExecuteStmt.createExecuteStmt(table, expr);
:}
;
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
similarity index 54%
copy from fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
copy to fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
index 141aa386d..16d8a3753 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java
@@ -17,10 +17,13 @@
package org.apache.impala.analysis;
+import com.google.common.base.Preconditions;
+
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableType;
@@ -28,59 +31,47 @@ import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
- * Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
- * tables, supported operations:
- * - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
- * calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
- * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
- * should be retained even when all snapshots are selected by expireOlderThan().
+ * Represents an ALTER TABLE <tbl> EXECUTE EXPIRE_SNAPSHOTS(<parameters>) statement on
+ * Iceberg tables, the parameter is (<timestamp>).
*/
-public class AlterTableExecuteStmt extends AlterTableStmt {
- private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
-
- private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
+public class AlterTableExecuteExpireSnapshotsStmt extends AlterTableExecuteStmt {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(AlterTableExecuteExpireSnapshotsStmt.class);
- // Expression of the function call after EXECUTE keyword. Parsed into an operation and
- // a value of that operation.
- private FunctionCallExpr fnCallExpr_;
+ protected final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
- // Value expression from fnCallExpr_.
- private Expr fnParamValue_;
-
- // The value after extracted from fnParamValue_ expression.
- private long olderThanMillis_ = -1;
-
- protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
- super(tableName);
- fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
+ protected AlterTableExecuteExpireSnapshotsStmt(TableName tableName, Expr fnCallExpr) {
+ super(tableName, fnCallExpr);
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
- analyzeFunctionCallExpr(analyzer);
+ analyzeFunctionCallExpr(analyzer, USAGE);
analyzeOlderThan(analyzer);
}
- private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
- // fnCallExpr_ analyzed here manually, because it is not an actual function but a
- // catalog operation.
- String fnName = fnCallExpr_.getFnName().toString();
- if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
- throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
- "TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
- }
- if (fnCallExpr_.getParams().size() != 1) {
- throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
- }
- fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
+ @Override
+ public String toSql(ToSqlOptions options) {
+ return fnCallExpr_.toSql();
}
- private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
+ @Override
+ public TAlterTableParams toThrift() {
+ TAlterTableParams params = super.toThrift();
+ params.setAlter_type(TAlterTableType.EXECUTE);
+ TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+ TAlterTableExecuteExpireSnapshotsParams executeExpireSnapshotsParams =
+ new TAlterTableExecuteExpireSnapshotsParams();
+ executeParams.setExpire_snapshots_params(executeExpireSnapshotsParams);
+ executeExpireSnapshotsParams.setOlder_than_millis(olderThanMillis_);
+ params.setSet_execute_params(executeParams);
+ return params;
+ }
+
+ protected void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
Preconditions.checkNotNull(fnParamValue_);
fnParamValue_.analyze(analyzer);
if (!fnParamValue_.isConstant()) {
@@ -90,31 +81,17 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
}
if (!fnParamValue_.getType().isTimestamp()) {
- throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
- fnParamValue_.getType() + "': " + fnParamValue_.toSql());
+ throw new AnalysisException(USAGE + " must be a timestamp type but is '"
+ + fnParamValue_.getType() + "': " + fnParamValue_.toSql());
}
try {
olderThanMillis_ =
ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
- LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
+ LOG.debug(USAGE + " millis: " + olderThanMillis_);
} catch (InternalException ie) {
- throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
- USAGE + ": " + ie.getMessage(), ie);
+ throw new AnalysisException("Invalid TIMESTAMP expression has been given to "
+ + USAGE + ": " + ie.getMessage(),
+ ie);
}
}
-
- @Override
- public String toSql(ToSqlOptions options) {
- return fnCallExpr_.toSql();
- }
-
- @Override
- public TAlterTableParams toThrift() {
- TAlterTableParams params = super.toThrift();
- params.setAlter_type(TAlterTableType.EXECUTE);
- TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
- executeParams.setOlder_than_millis(olderThanMillis_);
- params.setSet_execute_params(executeParams);
- return params;
- }
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java
new file mode 100644
index 000000000..ea4da0b67
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.TRollbackType;
+import org.apache.impala.util.ExprUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an ALTER TABLE EXECUTE ROLLBACK(<parameter>) statement.
+ * The parameter can be a snapshot id, or a timestamp.
+ * A rollback to a snapshot id causes a new snapshot to be
+ * created with the same snapshot id, but with a new creation timestamp.
+ * A rollback to a timestamp rolls back to the latest snapshot
+ * that has a creation timestamp that is older than the specified
+ * timestamp.
+ */
+public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt {
+ public static final String USAGE = "EXECUTE ROLLBACK(<expression>):";
+ private final static Logger LOG =
+ LoggerFactory.getLogger(AlterTableExecuteRollbackStmt.class);
+ private long snapshotVersion_;
+ private TRollbackType kind_;
+
+ public AlterTableExecuteRollbackStmt(TableName tableName, FunctionCallExpr fnCallExpr) {
+ super(tableName, fnCallExpr);
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ super.analyze(analyzer);
+ FeTable table = getTargetTable();
+ if (!(table instanceof FeIcebergTable)) {
+ throw new AnalysisException("ALTER TABLE EXECUTE ROLLBACK is only supported "
+ + "for Iceberg tables: " + table.getTableName());
+ }
+ analyzeFunctionCallExpr(analyzer, USAGE);
+ analyzeParameter(analyzer);
+ }
+
+ private void analyzeParameter(Analyzer analyzer) throws AnalysisException {
+ Preconditions.checkNotNull(fnParamValue_);
+ fnParamValue_.analyze(analyzer);
+
+ if (!fnParamValue_.isConstant()) {
+ throw new AnalysisException(
+ USAGE + " <expression> must be a constant expression: EXECUTE " + toSql());
+ }
+ if ((fnParamValue_ instanceof LiteralExpr)
+ && (fnParamValue_.getType().isIntegerType())) {
+ // Parameter is a snapshot id
+ kind_ = TRollbackType.VERSION_ID;
+ snapshotVersion_ = fnParamValue_.evalToInteger(analyzer, USAGE);
+ if (snapshotVersion_ < 0) {
+ throw new AnalysisException("Invalid version number has been given to " + USAGE
+ + ": " + snapshotVersion_);
+ }
+ LOG.debug(USAGE + " version: " + snapshotVersion_);
+ } else {
+ Expr timestampEpr = getParamConvertibleToTimestamp();
+ if (timestampEpr != null) {
+ // Parameter is a timestamp.
+ kind_ = TRollbackType.TIME_ID;
+ try {
+ olderThanMillis_ =
+ ExprUtil.localTimestampToUnixTimeMicros(analyzer, timestampEpr) / 1000;
+ LOG.debug(USAGE + " millis: " + olderThanMillis_);
+ } catch (InternalException ie) {
+ throw new AnalysisException("An invalid TIMESTAMP expression has been given "
+ + "to " + USAGE + " the expression " + fnParamValue_.toSql()
+ + " cannot be converted to a TIMESTAMP",
+ ie);
+ }
+ } else {
+ throw new AnalysisException(USAGE
+ + " <expression> must be an integer type or a timestamp, but is '"
+ + fnParamValue_.getType() + "': EXECUTE " + toSql());
+ }
+ }
+ }
+
+ /**
+ * If field fnParamValue_ is a Timestamp, or can be cast to a Timestamp,
+ * then return an Expr for the Timestamp.
+ * @return null if the fnParamValue_ cannot be converted to a Timestamp.
+ */
+ private Expr getParamConvertibleToTimestamp() {
+ Expr timestampExpr = fnParamValue_;
+ if (timestampExpr.getType().isStringType()) {
+ timestampExpr = new CastExpr(Type.TIMESTAMP, fnParamValue_);
+ }
+ if (timestampExpr.getType().isTimestamp()) {
+ return timestampExpr;
+ }
+ return null;
+ }
+
+ @Override
+ public String toSql(ToSqlOptions options) {
+ return fnCallExpr_.toSql();
+ }
+
+ @Override
+ public TAlterTableParams toThrift() {
+ TAlterTableParams params = super.toThrift();
+ params.setAlter_type(TAlterTableType.EXECUTE);
+ TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+ TAlterTableExecuteRollbackParams executeRollbackParams =
+ new TAlterTableExecuteRollbackParams();
+ executeParams.setExecute_rollback_params(executeRollbackParams);
+ executeRollbackParams.setKind(kind_);
+ switch (kind_) {
+ case TIME_ID: executeRollbackParams.setTimestamp_millis(olderThanMillis_); break;
+ case VERSION_ID: executeRollbackParams.setSnapshot_id(snapshotVersion_); break;
+ default: throw new IllegalStateException("Bad kind of execute rollback " + kind_);
+ }
+ params.setSet_execute_params(executeParams);
+ return params;
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
index 141aa386d..0a0e33206 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
@@ -17,104 +17,63 @@
package org.apache.impala.analysis;
-import org.apache.impala.catalog.FeIcebergTable;
-import org.apache.impala.catalog.Type;
-import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.InternalException;
-import org.apache.impala.thrift.TAlterTableExecuteParams;
-import org.apache.impala.thrift.TAlterTableParams;
-import org.apache.impala.thrift.TAlterTableType;
-import org.apache.impala.util.ExprUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.impala.common.AnalysisException;
+
/**
* Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
- * tables, supported operations:
- * - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
- * calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
- * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
- * should be retained even when all snapshots are selected by expireOlderThan().
+ * tables. For supported operations see the subclasses.
*/
public class AlterTableExecuteStmt extends AlterTableStmt {
- private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
-
- private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
// Expression of the function call after EXECUTE keyword. Parsed into an operation and
// a value of that operation.
- private FunctionCallExpr fnCallExpr_;
-
+ protected FunctionCallExpr fnCallExpr_;
// Value expression from fnCallExpr_.
- private Expr fnParamValue_;
-
+ protected Expr fnParamValue_;
// The value after extracted from fnParamValue_ expression.
- private long olderThanMillis_ = -1;
+ protected long olderThanMillis_ = -1;
protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
super(tableName);
- fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
+ fnCallExpr_ = (FunctionCallExpr) fnCallExpr;
}
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- super.analyze(analyzer);
- Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
- analyzeFunctionCallExpr(analyzer);
- analyzeOlderThan(analyzer);
+ /**
+ * Return an instance of a subclass of AlterTableExecuteStmt that can analyze the
+ * execute statement for the function call expression in 'expr'.
+ */
+ public static AlterTableStmt createExecuteStmt(TableName tableName, Expr expr)
+ throws AnalysisException {
+ FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr;
+ String functionNameOrig = fnCallExpr.getFnName().toString();
+ String functionName = functionNameOrig.toUpperCase();
+ switch (functionName) {
+ case "EXPIRE_SNAPSHOTS":
+ return new AlterTableExecuteExpireSnapshotsStmt(tableName, fnCallExpr);
+ case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr);
+ default:
+ throw new AnalysisException(String.format("'%s' is not supported by ALTER "
+ + "TABLE <table> EXECUTE. Supported operations are: "
+ + "EXPIRE_SNAPSHOTS(<expression>), "
+ + "ROLLBACK(<expression>).",
+ functionNameOrig));
+ }
}
- private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
+ protected void analyzeFunctionCallExpr(Analyzer ignoredAnalyzer, String usage)
+ throws AnalysisException {
// fnCallExpr_ analyzed here manually, because it is not an actual function but a
// catalog operation.
String fnName = fnCallExpr_.getFnName().toString();
- if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
- throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
- "TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
- }
+ Preconditions.checkState(
+ StringUtils.equalsAnyIgnoreCase(fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK"));
if (fnCallExpr_.getParams().size() != 1) {
- throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
+ throw new AnalysisException(usage + " must have one parameter: " + toSql());
}
fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
}
- private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
- Preconditions.checkNotNull(fnParamValue_);
- fnParamValue_.analyze(analyzer);
- if (!fnParamValue_.isConstant()) {
- throw new AnalysisException(USAGE + " must be a constant expression: " + toSql());
- }
- if (fnParamValue_.getType().isStringType()) {
- fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
- }
- if (!fnParamValue_.getType().isTimestamp()) {
- throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
- fnParamValue_.getType() + "': " + fnParamValue_.toSql());
- }
- try {
- olderThanMillis_ =
- ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
- LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
- } catch (InternalException ie) {
- throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
- USAGE + ": " + ie.getMessage(), ie);
- }
- }
-
- @Override
- public String toSql(ToSqlOptions options) {
- return fnCallExpr_.toSql();
- }
-
- @Override
- public TAlterTableParams toThrift() {
- TAlterTableParams params = super.toThrift();
- params.setAlter_type(TAlterTableType.EXECUTE);
- TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
- executeParams.setOlder_than_millis(olderThanMillis_);
- params.setSet_execute_params(executeParams);
- return params;
- }
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 28a82ac03..e2fa4e713 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -178,7 +178,7 @@ public interface FeFsTable extends FeTable {
Map<Long, ? extends PrunablePartition> getPartitionMap();
/**
- * @param the index of the target partitioning column
+ * @param col the index of the target partitioning column
* @return a map from value to a set of partitions for which column 'col'
* has that value.
*/
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3830ffa7e..6e2a3f478 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -165,6 +165,7 @@ import org.apache.impala.thrift.TAlterTableAddPartitionParams;
import org.apache.impala.thrift.TAlterTableAlterColParams;
import org.apache.impala.thrift.TAlterTableDropColParams;
import org.apache.impala.thrift.TAlterTableDropPartitionParams;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableOrViewSetOwnerParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableReplaceColsParams;
@@ -1367,9 +1368,23 @@ public class CatalogOpExecutor {
break;
case EXECUTE:
Preconditions.checkState(params.isSetSet_execute_params());
- String summary = IcebergCatalogOpExecutor.alterTableExecute(iceTxn,
- params.getSet_execute_params());
- addSummary(response, summary);
+ // All the EXECUTE functions operate only on Iceberg data.
+ needsToUpdateHms = false;
+ TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params();
+ if (setExecuteParams.isSetExecute_rollback_params()) {
+ String rollbackSummary = IcebergCatalogOpExecutor.alterTableExecuteRollback(
+ iceTxn, tbl, setExecuteParams.getExecute_rollback_params());
+ addSummary(response, rollbackSummary);
+ } else if (setExecuteParams.isSetExpire_snapshots_params()) {
+ String expireSummary =
+ IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(
+ iceTxn, setExecuteParams.getExpire_snapshots_params());
+ addSummary(response, expireSummary);
+ } else {
+ // Cannot happen, but throw just in case.
+ throw new IllegalStateException(
+ "Alter table execute statement is not implemented.");
+ }
break;
case SET_PARTITION_SPEC:
// Set partition spec uses 'TableOperations', not transactions.
@@ -1393,7 +1408,7 @@ public class CatalogOpExecutor {
break;
case REPLACE_COLUMNS:
// It doesn't make sense to replace all the columns of an Iceberg table as it
- // would basically make all existing data unaccessible.
+ // would basically make all existing data inaccessible.
default:
throw new UnsupportedOperationException(
"Unsupported ALTER TABLE operation for Iceberg tables: " +
@@ -1414,7 +1429,7 @@ public class CatalogOpExecutor {
if (!needsToUpdateHms) {
// We don't need to update HMS because either it is already done by Iceberg's
- // HiveCatalog, or we modified the PARTITION SPEC which is not stored in HMS.
+ // HiveCatalog, or we modified the Iceberg data which is not stored in HMS.
loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " +
params.getAlter_type().name());
catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 75aa6e9a0..10d19f7c5 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -23,16 +23,17 @@ import java.util.List;
import java.util.Map;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.BaseReplacePartitions;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotManager;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -42,32 +43,36 @@ import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.TableNotFoundException;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
import org.apache.impala.catalog.iceberg.IcebergCatalog;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.fb.FbIcebergColumnStats;
import org.apache.impala.fb.FbIcebergDataFile;
-import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
+import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TIcebergCatalog;
import org.apache.impala.thrift.TIcebergOperationParam;
import org.apache.impala.thrift.TIcebergPartitionSpec;
+import org.apache.impala.thrift.TRollbackType;
import org.apache.impala.util.IcebergSchemaConverter;
import org.apache.impala.util.IcebergUtil;
-import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a helper for the CatalogOpExecutor to provide Iceberg related DDL functionality
* such as creating and dropping tables from Iceberg.
*/
public class IcebergCatalogOpExecutor {
- public static final Logger LOG = Logger.getLogger(IcebergCatalogOpExecutor.class);
+ public static final Logger LOG =
+ LoggerFactory.getLogger(IcebergCatalogOpExecutor.class);
/**
* Create Iceberg table by Iceberg api
@@ -177,14 +182,49 @@ public class IcebergCatalogOpExecutor {
tableOp.commit(metadata, newMetadata);
}
- public static String alterTableExecute(Transaction txn,
- TAlterTableExecuteParams params) {
+ /**
+ * Use the ExpireSnapshot API to expire snapshots by calling the
+ * ExpireSnapshot.expireOlderThan(timestampMillis) method.
+ * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
+ * should be retained even when all snapshots are selected by expireOlderThan().
+ */
+ public static String alterTableExecuteExpireSnapshots(
+ Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) {
ExpireSnapshots expireApi = txn.expireSnapshots();
+ Preconditions.checkState(params.isSetOlder_than_millis());
expireApi.expireOlderThan(params.older_than_millis);
expireApi.commit();
return "Snapshots have been expired.";
}
+ /**
+ * Executes an ALTER TABLE EXECUTE ROLLBACK.
+ */
+ public static String alterTableExecuteRollback(
+ Transaction iceTxn, FeIcebergTable tbl, TAlterTableExecuteRollbackParams params) {
+ TRollbackType kind = params.getKind();
+ ManageSnapshots manageSnapshots = iceTxn.manageSnapshots();
+ switch (kind) {
+ case TIME_ID:
+ Preconditions.checkState(params.isSetTimestamp_millis());
+ long timestampMillis = params.getTimestamp_millis();
+ LOG.info("Rollback iceberg table to snapshot before timestamp {}",
+ timestampMillis);
+ manageSnapshots.rollbackToTime(timestampMillis);
+ break;
+ case VERSION_ID:
+ Preconditions.checkState(params.isSetSnapshot_id());
+ long snapshotId = params.getSnapshot_id();
+ LOG.info("Rollback iceberg table to snapshot id {}", snapshotId);
+ manageSnapshots.rollbackTo(snapshotId);
+ break;
+ default: throw new IllegalStateException("Bad kind of execute rollback " + kind);
+ }
+ // Commit the update.
+ manageSnapshots.commit();
+ return "Rollback executed.";
+ }
+
/**
* Drops a column from a Iceberg table.
*/
@@ -370,4 +410,4 @@ public class IcebergCatalogOpExecutor {
String.valueOf(version));
updateProps.commit();
}
-}
+}
\ No newline at end of file
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 6a96ef100..418334be1 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -408,7 +408,8 @@ import org.apache.impala.thrift.TReservedWordsVersion;
"year", "month", "day", "hour", "minute", "second",
"begin", "call", "check", "classifier", "close", "identity", "language",
"localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
- "period", "result", "return", "sql", "start", "system", "time", "user", "value"
+ "period", "result", "return", "rollback", "sql", "start", "system", "time",
+ "user", "value"
}));
}
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 5986fb1c8..c4f656d3b 100755
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -4281,8 +4281,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Negative tests
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"unsupported_operation(123456789);", "'unsupported_operation' is not supported " +
- "by ALTER TABLE <table> EXECUTE. Supported operation is " +
- "EXPIRE_SNAPSHOTS(<expression>)");
+ "by ALTER TABLE <table> EXECUTE. Supported operations are: " +
+ "EXPIRE_SNAPSHOTS(<expression>), ROLLBACK(<expression>)");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"expire_snapshots(now(), 3);", "EXPIRE_SNAPSHOTS(<expression>) must have one " +
"parameter: expire_snapshots(now(), 3)");
@@ -4297,6 +4297,49 @@ public class AnalyzeDDLTest extends FrontendTestBase {
" been given to EXPIRE_SNAPSHOTS(<expression>)");
}
+ @Test
+ public void TestAlterExecuteRollback() {
+ AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback('2022-01-04 10:00:00');");
+ AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback(123456);");
+ // Timestamp can be an expression.
+ AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback(cast('2021-08-09 15:52:45' as timestamp) - interval 2 days + " +
+ "interval 3 hours);");
+
+ // Negative tests
+ AnalysisError("alter table nodb.alltypes execute " +
+ "rollback('2022-01-04 10:00:00');",
+ "Could not resolve table reference: 'nodb.alltypes'");
+ AnalysisError("alter table functional.alltypes execute " +
+ "rollback('2022-01-04 10:00:00');",
+ "ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables: " +
+ "functional.alltypes");
+ AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback(id);", "EXECUTE ROLLBACK(<expression>): " +
+ "<expression> must be a constant expression: EXECUTE rollback(id)");
+ AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback(3.14);", "EXECUTE ROLLBACK(<expression>): <expression> " +
+ "must be an integer type or a timestamp, but is 'DECIMAL(3,2)': " +
+ "EXECUTE rollback(3.14)");
+ AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback('2021-02-32 15:52:45');", "An invalid TIMESTAMP expression has been " +
+ "given to EXECUTE ROLLBACK(<expression>): the expression " +
+ "'2021-02-32 15:52:45' cannot be converted to a TIMESTAMP");
+ AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback('the beginning');", "An invalid TIMESTAMP expression has been " +
+ "given to EXECUTE ROLLBACK(<expression>): the expression " +
+ "'the beginning' cannot be converted to a TIMESTAMP");
+ AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback(1111,2222);",
+ "EXECUTE ROLLBACK(<expression>): must have one parameter");
+ AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+ "rollback('1111');", "An invalid TIMESTAMP expression has been " +
+ "given to EXECUTE ROLLBACK(<expression>): the expression " +
+ "'1111' cannot be converted to a TIMESTAMP");
+ }
+
private static String buildLongOwnerName() {
StringBuilder comment = new StringBuilder();
for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test
new file mode 100644
index 000000000..4c2c3cd18
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test
@@ -0,0 +1,27 @@
+====
+---- QUERY
+# EXECUTE ROLLBACK with an invalid snapshot id on a partitioned Iceberg table.
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK(1)
+---- CATCH
+Cannot roll back to unknown snapshot id: 1
+====
+---- QUERY
+# EXECUTE ROLLBACK to a too old date on a partitioned Iceberg table.
+set timezone=CET;
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('2020-08-31 07:58:00')
+---- CATCH
+Cannot roll back, no valid snapshot older than: 1598853480000
+====
+---- QUERY
+# EXECUTE ROLLBACK to an Invalid timestamp expression on a partitioned Iceberg table.
+set timezone=CET;
+ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('1111');
+---- CATCH
+An invalid TIMESTAMP expression has been given to EXECUTE ROLLBACK(<expression>): the expression '1111' cannot be converted to a TIMESTAMP
+====
+---- QUERY
+# EXECUTE ROLLBACK fails on a non-Iceberg table.
+ALTER TABLE functional_parquet.alltypestiny EXECUTE ROLLBACK(1111111)
+---- CATCH
+ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables
+====
\ No newline at end of file
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index dc7ac51c9..0a53c31f7 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -28,6 +28,7 @@ from tests.common.skip import SkipIfFS
from tests.util.hive_utils import HiveDbWrapper
from tests.util.event_processor_utils import EventProcessorUtils
from tests.util.filesystem_utils import WAREHOUSE
+from tests.util.iceberg_util import IcebergCatalogs
@SkipIfFS.hive
@@ -482,6 +483,7 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
def test_iceberg_self_events(self, unique_database):
"""This test checks that Impala doesn't refresh Iceberg tables on self events."""
tbl_name = unique_database + ".test_iceberg_events"
+ iceberg_catalogs = IcebergCatalogs(unique_database)
def check_self_events(query, skips_events=True):
tbls_refreshed_before, partitions_refreshed_before, \
@@ -495,19 +497,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
if skips_events:
assert events_skipped_after > events_skipped_before
- hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
- hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " +
- "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
- unique_database))
- hive_catalog = "'iceberg.catalog'='hive.catalog'"
- hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
- hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
-
- all_catalogs = [hadoop_tables, hadoop_catalog, hive_catalog, hive_catalogs,
- hadoop_catalogs]
-
- for catalog in all_catalogs:
- is_hive_catalog = catalog == hive_catalog or catalog == hive_catalogs
+ for catalog in iceberg_catalogs.get_iceberg_catalog_properties():
+ is_hive_catalog = iceberg_catalogs.is_a_hive_catalog(catalog)
self.client.execute("""
CREATE TABLE {0} (i int) STORED AS ICEBERG
TBLPROPERTIES ({1})""".format(tbl_name, catalog))
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 6f64ced0e..a8e05baf5 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -22,6 +22,8 @@ import logging
import os
import pytest
import random
+
+import re
import time
from subprocess import check_call
@@ -33,14 +35,14 @@ import json
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.iceberg_test_suite import IcebergTestSuite
-from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfLocal
+from tests.common.skip import SkipIf, SkipIfDockerizedCluster
from tests.common.file_utils import (
create_iceberg_table_from_directory,
create_table_from_parquet)
from tests.shell.util import run_impala_shell_cmd
from tests.util.filesystem_utils import get_fs_path, IS_HDFS
from tests.util.get_parquet_metadata import get_parquet_metadata
-from tests.util.iceberg_util import cast_ts, quote, parse_timestamp
+from tests.util.iceberg_util import cast_ts, quote, get_snapshots, IcebergCatalogs
LOG = logging.getLogger(__name__)
@@ -69,58 +71,61 @@ class TestIcebergTable(IcebergTestSuite):
def test_expire_snapshots(self, unique_database):
tbl_name = unique_database + ".expire_snapshots"
-
- # We are setting the TIMEZONE query option in this test, so let's create a local
- # impala client.
- with self.create_impala_client() as impalad_client:
- # Iceberg doesn't create a snapshot entry for the initial empty table
- impalad_client.execute("create table {0} (i int) stored as iceberg"
- .format(tbl_name))
- ts_0 = datetime.datetime.now()
- insert_q = "insert into {0} values (1)".format(tbl_name)
- ts_1 = self.execute_query_ts(impalad_client, insert_q)
- time.sleep(5)
- impalad_client.execute(insert_q)
- time.sleep(5)
- ts_2 = self.execute_query_ts(impalad_client, insert_q)
- impalad_client.execute(insert_q)
-
- # There should be 4 snapshots initially
- self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
- # Expire the oldest snapshot and test that the oldest one was expired
- expire_q = "alter table {0} execute expire_snapshots({1})"
- impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
- self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
- self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
-
- # Expire with a timestamp in which the interval does not touch existing snapshot
- impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
- self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
-
- # Expire all, but retain 1
- impalad_client.execute(expire_q.format(tbl_name,
- cast_ts(datetime.datetime.now())))
- self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
-
- # Change number of retained snapshots, then expire all
- impalad_client.execute("""alter table {0} set tblproperties
- ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
- impalad_client.execute(insert_q)
- impalad_client.execute(insert_q)
- impalad_client.execute(expire_q.format(tbl_name,
- cast_ts(datetime.datetime.now())))
- self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
-
- # Check that timezone is interpreted in local timezone controlled by query option
- # TIMEZONE.
- impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
- impalad_client.execute(insert_q)
- ts_tokyo = self.impala_now(impalad_client)
- impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
- impalad_client.execute(insert_q)
- impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
- impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
- self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
+ iceberg_catalogs = IcebergCatalogs(unique_database)
+ for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
+ # We are setting the TIMEZONE query option in this test, so let's create a local
+ # impala client.
+ with self.create_impala_client() as impalad_client:
+ # Iceberg doesn't create a snapshot entry for the initial empty table
+ impalad_client.execute("""
+ create table {0} (i int) stored as iceberg
+ TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
+ ts_0 = datetime.datetime.now()
+ insert_q = "insert into {0} values (1)".format(tbl_name)
+ ts_1 = self.execute_query_ts(impalad_client, insert_q)
+ time.sleep(5)
+ impalad_client.execute(insert_q)
+ time.sleep(5)
+ ts_2 = self.execute_query_ts(impalad_client, insert_q)
+ impalad_client.execute(insert_q)
+
+ # There should be 4 snapshots initially
+ self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
+ # Expire the oldest snapshot and test that the oldest one was expired
+ expire_q = "alter table {0} execute expire_snapshots({1})"
+ impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
+ self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+ self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
+
+ # Expire with a timestamp in which the interval does not touch existing snapshot
+ impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
+ self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+
+ # Expire all, but retain 1
+ impalad_client.execute(expire_q.format(tbl_name,
+ cast_ts(datetime.datetime.now())))
+ self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
+
+ # Change number of retained snapshots, then expire all
+ impalad_client.execute("""alter table {0} set tblproperties
+ ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
+ impalad_client.execute(insert_q)
+ impalad_client.execute(insert_q)
+ impalad_client.execute(expire_q.format(tbl_name,
+ cast_ts(datetime.datetime.now())))
+ self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
+
+ # Check that timezone is interpreted in local timezone controlled by query option
+ # TIMEZONE.
+ impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+ impalad_client.execute(insert_q)
+ ts_tokyo = self.impala_now(impalad_client)
+ impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
+ impalad_client.execute(insert_q)
+ impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+ impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
+ self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
+ impalad_client.execute("DROP TABLE {0}".format(tbl_name))
def test_truncate_iceberg_tables(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
@@ -128,7 +133,7 @@ class TestIcebergTable(IcebergTestSuite):
# With IMPALA-11429 there is an extra "ALTER TABLE SET OWNER" right after executing
# "CREATE TABLE". As a result dropping the table location right after CREATE TABLE will
# trigger a known bug: IMPALA-11509. Hence, turning this test off until there is a fix
- # for this issue. Note, we could add a sleep righ after table creation that could
+ # for this issue. Note, we could add a sleep right after table creation that could
# workaround the above mentioned bug but then we would hit another issue: IMPALA-11502.
@SkipIf.not_dfs
def test_drop_incomplete_table(self, vector, unique_database):
@@ -231,31 +236,127 @@ class TestIcebergTable(IcebergTestSuite):
tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name))
self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name))
self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name))
- result = self.client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
- assert(len(result.data) == 2)
- first_snapshot = result.data[0].split("\t")
- second_snapshot = result.data[1].split("\t")
+ snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2)
+ first_snapshot = snapshots[0]
+ second_snapshot = snapshots[1]
# Check that first snapshot is older than the second snapshot.
- assert(first_snapshot[0] < second_snapshot[0])
+ assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time())
# Check that second snapshot's parent ID is the snapshot ID of the first snapshot.
- assert(first_snapshot[1] == second_snapshot[2])
+ assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id())
# The first snapshot has no parent snapshot ID.
- assert(first_snapshot[2] == "NULL")
+ assert(first_snapshot.get_parent_id() is None)
# Check "is_current_ancestor" column.
- assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE")
+ assert(first_snapshot.is_current_ancestor())
+ assert(second_snapshot.is_current_ancestor())
+
+ def test_execute_rollback_negative(self, vector):
+ """Negative test for EXECUTE ROLLBACK."""
+ self.run_test_case('QueryTest/iceberg-rollback-negative', vector)
+
+ def test_execute_rollback(self, unique_database):
+ """Test for EXECUTE ROLLBACK."""
+ iceberg_catalogs = IcebergCatalogs(unique_database)
+ for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
+ # Create a table with multiple snapshots.
+ tbl_name = unique_database + ".iceberg_execute_rollback"
+ # We are setting the TIMEZONE query option in this test, so let's create a local
+ # impala client.
+ with self.create_impala_client() as impalad_client:
+ impalad_client.execute("""
+ create table {0} (i int) stored as iceberg
+ TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
+ initial_snapshots = 3
+ for i in range(initial_snapshots):
+ impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i))
+ snapshots = get_snapshots(impalad_client, tbl_name,
+ expected_result_size=initial_snapshots)
+
+ output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id())
+ LOG.info("success output={0}".format(output))
+
+ # We rolled back, but that creates a new snapshot, so now there are 4.
+ snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4)
+ # The new snapshot has the same id (and parent id) as the snapshot we rolled back
+ # to, but it has a different creation time.
+ assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id()
+ assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id()
+ assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time()
+ # The "orphaned" snapshot is now not a current ancestor.
+ assert not snapshots[2].is_current_ancestor()
+
+ # We cannot roll back to a snapshot that is not a current ancestor.
+ output = self.rollback_to_id_expect_failure(tbl_name,
+ snapshots[2].get_snapshot_id(),
+ expected_text="Cannot roll back to snapshot, not an ancestor of the current "
+ "state")
+
+ # Create another snapshot.
+ before_insert = datetime.datetime.now()
+ impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4))
+ snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
+
+ # Rollback to before the last insert.
+ self.rollback_to_ts(impalad_client, tbl_name, before_insert)
+ # This creates another snapshot.
+ snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6)
+ # The snapshot id is the same, the dates differ
+ assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id()
+ assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time()
+ assert not snapshots[4].is_current_ancestor()
+
+ # Show that the EXECUTE ROLLBACK is respecting the current timezone.
+ # To do this we try to roll back to a time for which there is no
+ # snapshot, this will fail with an error message that includes the specified
+ # time. We parse out that time. By doing this in two timezones we can see
+ # that the parameter being used was affected by the current timezone.
+ one_hour_ago = before_insert - datetime.timedelta(hours=1)
+ # We use Timezones from Japan and Iceland to avoid any DST complexities.
+ impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+ japan_ts = self.get_snapshot_ts_from_failed_rollback(
+ impalad_client, tbl_name, one_hour_ago)
+ impalad_client.execute("SET TIMEZONE='Iceland'")
+ iceland_ts = self.get_snapshot_ts_from_failed_rollback(
+ impalad_client, tbl_name, one_hour_ago)
+ diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60)
+ assert diff_hours == 9
+
+ impalad_client.execute("DROP TABLE {0}".format(tbl_name))
+
+ def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts):
+ """Run an EXECUTE ROLLBACK which is expected to fail.
+ Parse the error message to extract the timestamp for which there
+ was no snapshot, and convert the string to an integer"""
+ try:
+ self.rollback_to_ts(client, tbl_name, ts)
+ assert False, "Query should have failed"
+ except ImpalaBeeswaxException as e:
+ result = re.search(r".*no valid snapshot older than: (\d+)", str(e))
+ time_str = result.group(1)
+ snapshot_ts = int(time_str)
+ assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result)
+ return snapshot_ts
+
+ def rollback_to_ts(self, client, tbl_name, ts):
+ """Rollback a table to a snapshot timestamp."""
+ query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat())
+ return self.execute_query_expect_success(client, query)
+
+ def rollback_to_id(self, tbl_name, id):
+ """Rollback a table to a snapshot id."""
+ query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
+ return self.execute_query_expect_success(self.client, query)
+
+ def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None):
+ """Attempt to roll back a table to a snapshot id, expecting a failure."""
+ query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
+ output = self.execute_query_expect_failure(self.client, query)
+ if expected_text:
+ assert expected_text in str(output)
+ return output
def test_describe_history_params(self, unique_database):
tbl_name = unique_database + ".describe_history"
- def expect_results_between(ts_start, ts_end, expected_result_size):
- query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format(
- tbl_name, cast_ts(ts_start), cast_ts(ts_end))
- data = impalad_client.execute(query)
- assert len(data.data) == expected_result_size
- for i in range(len(data.data)):
- result_ts_dt = parse_timestamp(data.data[i].split('\t')[0])
- assert result_ts_dt >= ts_start and result_ts_dt <= ts_end
-
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
@@ -279,10 +380,11 @@ class TestIcebergTable(IcebergTestSuite):
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0)
# Describe history with BETWEEN <ts> AND <ts> predicate
- expect_results_between(ts_1, ts_2, 1)
- expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2, 2)
- expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2 +
- datetime.timedelta(hours=1), 3)
+ self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1)
+ self.expect_results_between(impalad_client, tbl_name,
+ ts_1 - datetime.timedelta(hours=1), ts_2, 2)
+ self.expect_results_between(impalad_client, tbl_name,
+ ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3)
# Check that timezone is interpreted in local timezone controlled by query option
# TIMEZONE. Persist the local times first and create a new snapshot.
@@ -341,14 +443,6 @@ class TestIcebergTable(IcebergTestSuite):
tbl_name, snapshot_id),
expected)
- def get_snapshots():
- data = impalad_client.execute("describe history {0}".format(tbl_name))
- ret = list()
- for row in data.data:
- fields = row.split('\t')
- ret.append(fields[1])
- return ret
-
def impala_now():
now_data = impalad_client.execute("select now()")
return now_data.data[0]
@@ -409,12 +503,12 @@ class TestIcebergTable(IcebergTestSuite):
ij_cols)
# Query table as of snapshot IDs.
- snapshots = get_snapshots()
- expect_results_v(snapshots[0], ['1'], i_cols)
- expect_results_v(snapshots[1], ['1', '2'], i_cols)
- expect_results_v(snapshots[2], [], i_cols)
- expect_results_v(snapshots[3], ['100'], i_cols)
- expect_results_v(snapshots[4], ['100\tNULL', '3\t103'], ij_cols)
+ snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
+ expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols)
+ expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols)
+ expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols)
+ expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols)
+ expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols)
# Test of plain count star optimization
# 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile
@@ -427,11 +521,11 @@ class TestIcebergTable(IcebergTestSuite):
expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0')
expect_for_count_star_t(cast_ts(ts_5), '2')
expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2')
- expect_for_count_star_v(snapshots[0], '1')
- expect_for_count_star_v(snapshots[1], '2')
- expect_for_count_star_v(snapshots[2], '0')
- expect_for_count_star_v(snapshots[3], '1')
- expect_for_count_star_v(snapshots[4], '2')
+ expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1')
+ expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2')
+ expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0')
+ expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1')
+ expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2')
# SELECT diff
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
@@ -442,18 +536,19 @@ class TestIcebergTable(IcebergTestSuite):
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
MINUS
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
- tbl=tbl_name, v_new=snapshots[1], v_old=snapshots[0]),
+ tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(),
+ v_old=snapshots[0].get_snapshot_id()),
['2'], i_cols)
# Mix SYSTEM_TIME and SYSTEM_VERSION
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
MINUS
SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
- tbl=tbl_name, v_new=snapshots[1], ts_old=ts_1),
+ tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1),
['2'], i_cols)
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
MINUS
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
- tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0]),
+ tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()),
['2'], i_cols)
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
MINUS
diff --git a/tests/util/iceberg_util.py b/tests/util/iceberg_util.py
index 924b5e249..21703acfd 100644
--- a/tests/util/iceberg_util.py
+++ b/tests/util/iceberg_util.py
@@ -106,3 +106,34 @@ def get_snapshots(impalad_client, tbl_name, ts_start=None, ts_end=None,
for snapshot_str in rows.data:
results.append(Snapshot(snapshot_str))
return results
+
+
+class IcebergCatalogs:
+ """Utility class to generate TBLPROPERTIES corresponding to various iceberg catalogs."""
+
+ def __init__(self, database):
+ """Create a IcebergCatalogs object parameterized by database name."""
+ self.database = database
+
+ hive_catalog = "'iceberg.catalog'='hive.catalog'"
+ hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
+ hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
+ hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
+
+ def get_iceberg_catalog_properties(self):
+ """Return a list containing TBLPROPERTIES corresponding to various iceberg catalogs.
+ The TBLPROPERTIES can be used to create tables."""
+ hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', "
+ + "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
+ self.database))
+ return [
+ self.hadoop_tables,
+ self.hive_catalog,
+ self.hive_catalogs,
+ self.hadoop_catalogs,
+ hadoop_catalog
+ ]
+
+ def is_a_hive_catalog(self, catalog):
+ """Return true if the catalog property is for a Hive catalog."""
+ return catalog == self.hive_catalog or catalog == self.hive_catalogs