You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/01/15 10:02:21 UTC
[hive] branch master updated: HIVE-26927: Iceberg: Add support for set_current_snapshotid. (#3937). (Ayush Saxena, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 55805205ddf HIVE-26927: Iceberg: Add support for set_current_snapshotid. (#3937). (Ayush Saxena, reviewed by Denys Kuzmenko)
55805205ddf is described below
commit 55805205ddfbfa9cea3a150bbec6ef551b141416
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sun Jan 15 15:32:13 2023 +0530
HIVE-26927: Iceberg: Add support for set_current_snapshotid. (#3937). (Ayush Saxena, reviewed by Denys Kuzmenko)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 7 +++
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 13 +++++
.../mr/hive/TestHiveIcebergSetCurrentSnapshot.java | 63 ++++++++++++++++++++++
.../hadoop/hive/ql/parse/AlterClauseParser.g | 2 +
.../apache/hadoop/hive/ql/parse/HiveLexerParent.g | 1 +
.../hadoop/hive/ql/parse/IdentifiersParser.g | 1 +
.../table/execute/AlterTableExecuteAnalyzer.java | 7 +++
.../hive/ql/parse/AlterTableExecuteSpec.java | 26 ++++++++-
8 files changed, 119 insertions(+), 1 deletion(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 4b225e4719b..dac0da89331 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -533,6 +533,13 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
(AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams();
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
break;
+ case SET_CURRENT_SNAPSHOT:
+ AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
+ (AlterTableExecuteSpec.SetCurrentSnapshotSpec) executeSpec.getOperationParams();
+ LOG.debug("Executing set current snapshot operation on iceberg table {}.{} to version {}", hmsTable.getDbName(),
+ hmsTable.getTableName(), setSnapshotVersionSpec.getSnapshotId());
+ IcebergTableUtil.setCurrentSnapshot(icebergTable, setSnapshotVersionSpec.getSnapshotId());
+ break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", executeSpec.getOperationType().name()));
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 60a9158ecab..23244fc1adb 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -229,4 +229,17 @@ public class IcebergTableUtil {
}
manageSnapshots.commit();
}
+
+ /**
+ * Set the current snapshot for the iceberg table
+ * @param table the iceberg table
+ * @param value parameter of the rollback, that can be a timestamp in millis or a snapshot id
+ */
+ public static void setCurrentSnapshot(Table table, Long value) {
+ ManageSnapshots manageSnapshots = table.manageSnapshots();
+ LOG.debug("Rolling the iceberg table {} from snapshot id {} to snapshot ID {}", table.name(),
+ table.currentSnapshot().snapshotId(), value);
+ manageSnapshots.setCurrentSnapshot(value);
+ manageSnapshots.commit();
+ }
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
new file mode 100644
index 00000000000..e45e4b26ee0
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests setting current snapshot feature
+ */
+public class TestHiveIcebergSetCurrentSnapshot extends HiveIcebergStorageHandlerWithEngineBase {
+
+ @Test
+ public void testSetCurrentSnapshot() throws IOException, InterruptedException {
+ TableIdentifier identifier = TableIdentifier.of("default", "source");
+ Table table =
+ testTables.createTableWithVersions(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 5);
+ Assert.assertEquals(5, table.history().size());
+ List<Object[]> result4 = shell.executeStatement(
+ "SELECT * from " + identifier.name() + " FOR SYSTEM_VERSION AS OF " + table.history().get(4).snapshotId());
+ List<Object[]> result3 = shell.executeStatement(
+ "SELECT * from " + identifier.name() + " FOR SYSTEM_VERSION AS OF " + table.history().get(3).snapshotId());
+ shell.executeStatement(
+ "ALTER TABLE " + identifier.name() + " EXECUTE SET_CURRENT_SNAPSHOT(" + table.history().get(3).snapshotId() +
+ ")");
+ List<Object[]> currentResult = shell.executeStatement("SELECT * from " + identifier.name());
+ Assert.assertEquals(result3.size(), currentResult.size());
+ HiveIcebergTestUtils.validateData(
+ HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, currentResult),
+ HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result3), 0);
+
+ shell.executeStatement(
+ "ALTER TABLE " + identifier.name() + " EXECUTE SET_CURRENT_SNAPSHOT(" + table.history().get(4).snapshotId() +
+ ")");
+ currentResult = shell.executeStatement("SELECT * from " + identifier.name());
+ Assert.assertEquals(result4.size(), currentResult.size());
+ HiveIcebergTestUtils.validateData(
+ HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, currentResult),
+ HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result4), 0);
+ }
+}
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index b29e0443b93..2542909cb80 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -465,6 +465,8 @@ alterStatementSuffixExecute
-> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS LPAREN (expireParam=StringLiteral) RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam)
+ | KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=Number) RPAREN
+ -> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam)
;
fileFormat
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
index f6d90940632..35b054c08d3 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
@@ -390,6 +390,7 @@ KW_SPEC: 'SPEC';
KW_SYSTEM_TIME: 'SYSTEM_TIME';
KW_SYSTEM_VERSION: 'SYSTEM_VERSION';
KW_EXPIRE_SNAPSHOTS: 'EXPIRE_SNAPSHOTS';
+KW_SET_CURRENT_SNAPSHOT: 'SET_CURRENT_SNAPSHOT';
// Operators
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 921061a635a..2e7d12309e8 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -976,6 +976,7 @@ nonReserved
| KW_SPEC
| KW_SYSTEM_TIME | KW_SYSTEM_VERSION
| KW_EXPIRE_SNAPSHOTS
+ | KW_SET_CURRENT_SNAPSHOT
;
//The following SQL2011 reserved keywords are used as function name only, but not as identifiers.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index 7f5d5dd43fc..6692c2413d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -44,6 +44,7 @@ import java.util.Map;
import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT;
import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.ROLLBACK;
+import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.SET_CURRENT_SNAPSHOT;
import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec.RollbackType.TIME;
import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec.RollbackType.VERSION;
@@ -91,6 +92,12 @@ public class AlterTableExecuteAnalyzer extends AbstractAlterTableAnalyzer {
TimestampTZ time = TimestampTZUtil.parse(PlanUtils.stripQuotes(child.getText()), timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli()));
desc = new AlterTableExecuteDesc(tableName, partitionSpec, spec);
+ } else if (HiveParser.KW_SET_CURRENT_SNAPSHOT == executeCommandType.getType()) {
+ ASTNode child = (ASTNode) command.getChild(1);
+ AlterTableExecuteSpec<AlterTableExecuteSpec.SetCurrentSnapshotSpec> spec =
+ new AlterTableExecuteSpec(SET_CURRENT_SNAPSHOT,
+ new AlterTableExecuteSpec.SetCurrentSnapshotSpec(Long.valueOf(child.getText())));
+ desc = new AlterTableExecuteDesc(tableName, partitionSpec, spec);
}
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
index d1bb59f5658..3f5a2099f8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
@@ -33,7 +33,8 @@ public class AlterTableExecuteSpec<T> {
public enum ExecuteOperationType {
ROLLBACK,
- EXPIRE_SNAPSHOT
+ EXPIRE_SNAPSHOT,
+ SET_CURRENT_SNAPSHOT
}
private final ExecuteOperationType operationType;
@@ -116,4 +117,27 @@ public class AlterTableExecuteSpec<T> {
return MoreObjects.toStringHelper(this).add("timestampMillis", timestampMillis).toString();
}
}
+
+ /**
+ * Value object class, that stores the set snapshot version operation specific parameters
+ * <ul>
+ * <li>snapshot Id: it should be a valid snapshot version</li>
+ * </ul>
+ */
+ public static class SetCurrentSnapshotSpec {
+ private final long snapshotId;
+
+ public SetCurrentSnapshotSpec(Long snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+
+ public Long getSnapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("snapshotId", snapshotId).toString();
+ }
+ }
}