You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/01/15 10:02:21 UTC

[hive] branch master updated: HIVE-26927: Iceberg: Add support for set_current_snapshotid. (#3937). (Ayush Saxena, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 55805205ddf HIVE-26927: Iceberg: Add support for set_current_snapshotid. (#3937). (Ayush Saxena, reviewed by Denys Kuzmenko)
55805205ddf is described below

commit 55805205ddfbfa9cea3a150bbec6ef551b141416
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sun Jan 15 15:32:13 2023 +0530

    HIVE-26927: Iceberg: Add support for set_current_snapshotid. (#3937). (Ayush Saxena, reviewed by Denys Kuzmenko)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  7 +++
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   | 13 +++++
 .../mr/hive/TestHiveIcebergSetCurrentSnapshot.java | 63 ++++++++++++++++++++++
 .../hadoop/hive/ql/parse/AlterClauseParser.g       |  2 +
 .../apache/hadoop/hive/ql/parse/HiveLexerParent.g  |  1 +
 .../hadoop/hive/ql/parse/IdentifiersParser.g       |  1 +
 .../table/execute/AlterTableExecuteAnalyzer.java   |  7 +++
 .../hive/ql/parse/AlterTableExecuteSpec.java       | 26 ++++++++-
 8 files changed, 119 insertions(+), 1 deletion(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 4b225e4719b..dac0da89331 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -533,6 +533,13 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
             (AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams();
         icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
         break;
+      case SET_CURRENT_SNAPSHOT:
+        AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
+            (AlterTableExecuteSpec.SetCurrentSnapshotSpec) executeSpec.getOperationParams();
+        LOG.debug("Executing set current snapshot operation on iceberg table {}.{} to version {}", hmsTable.getDbName(),
+            hmsTable.getTableName(), setSnapshotVersionSpec.getSnapshotId());
+        IcebergTableUtil.setCurrentSnapshot(icebergTable, setSnapshotVersionSpec.getSnapshotId());
+        break;
       default:
         throw new UnsupportedOperationException(
             String.format("Operation type %s is not supported", executeSpec.getOperationType().name()));
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 60a9158ecab..23244fc1adb 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -229,4 +229,17 @@ public class IcebergTableUtil {
     }
     manageSnapshots.commit();
   }
+
+  /**
+   * Set the current snapshot for the iceberg table
+   * @param table the iceberg table
+   * @param value parameter of the rollback, that can be a timestamp in millis or a snapshot id
+   */
+  public static void setCurrentSnapshot(Table table, Long value) {
+    ManageSnapshots manageSnapshots = table.manageSnapshots();
+    LOG.debug("Rolling the iceberg table {} from snapshot id {} to snapshot ID {}", table.name(),
+        table.currentSnapshot().snapshotId(), value);
+    manageSnapshots.setCurrentSnapshot(value);
+    manageSnapshots.commit();
+  }
 }
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
new file mode 100644
index 00000000000..e45e4b26ee0
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests setting current snapshot feature
+ */
+public class TestHiveIcebergSetCurrentSnapshot extends HiveIcebergStorageHandlerWithEngineBase {
+
+  @Test
+  public void testSetCurrentSnapshot() throws IOException, InterruptedException {
+    TableIdentifier identifier = TableIdentifier.of("default", "source");
+    Table table =
+        testTables.createTableWithVersions(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 5);
+    Assert.assertEquals(5, table.history().size());
+    List<Object[]> result4 = shell.executeStatement(
+        "SELECT * from " + identifier.name() + " FOR SYSTEM_VERSION AS OF " + table.history().get(4).snapshotId());
+    List<Object[]> result3 = shell.executeStatement(
+        "SELECT * from " + identifier.name() + " FOR SYSTEM_VERSION AS OF " + table.history().get(3).snapshotId());
+    shell.executeStatement(
+        "ALTER TABLE " + identifier.name() + " EXECUTE SET_CURRENT_SNAPSHOT(" + table.history().get(3).snapshotId() +
+            ")");
+    List<Object[]> currentResult = shell.executeStatement("SELECT * from " + identifier.name());
+    Assert.assertEquals(result3.size(), currentResult.size());
+    HiveIcebergTestUtils.validateData(
+        HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, currentResult),
+        HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result3), 0);
+
+    shell.executeStatement(
+        "ALTER TABLE " + identifier.name() + " EXECUTE SET_CURRENT_SNAPSHOT(" + table.history().get(4).snapshotId() +
+            ")");
+    currentResult = shell.executeStatement("SELECT * from " + identifier.name());
+    Assert.assertEquals(result4.size(), currentResult.size());
+    HiveIcebergTestUtils.validateData(
+        HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, currentResult),
+        HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result4), 0);
+  }
+}
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index b29e0443b93..2542909cb80 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -465,6 +465,8 @@ alterStatementSuffixExecute
     -> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam)
     | KW_EXECUTE KW_EXPIRE_SNAPSHOTS LPAREN (expireParam=StringLiteral) RPAREN
     -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam)
+    | KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=Number) RPAREN
+    -> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam)
     ;
 
 fileFormat
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
index f6d90940632..35b054c08d3 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
@@ -390,6 +390,7 @@ KW_SPEC: 'SPEC';
 KW_SYSTEM_TIME: 'SYSTEM_TIME';
 KW_SYSTEM_VERSION: 'SYSTEM_VERSION';
 KW_EXPIRE_SNAPSHOTS: 'EXPIRE_SNAPSHOTS';
+KW_SET_CURRENT_SNAPSHOT: 'SET_CURRENT_SNAPSHOT';
 
 
 // Operators
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 921061a635a..2e7d12309e8 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -976,6 +976,7 @@ nonReserved
     | KW_SPEC
     | KW_SYSTEM_TIME | KW_SYSTEM_VERSION
     | KW_EXPIRE_SNAPSHOTS
+    | KW_SET_CURRENT_SNAPSHOT
 ;
 
 //The following SQL2011 reserved keywords are used as function name only, but not as identifiers.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index 7f5d5dd43fc..6692c2413d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -44,6 +44,7 @@ import java.util.Map;
 
 import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT;
 import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.ROLLBACK;
+import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.SET_CURRENT_SNAPSHOT;
 import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec.RollbackType.TIME;
 import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec.RollbackType.VERSION;
 
@@ -91,6 +92,12 @@ public class AlterTableExecuteAnalyzer extends AbstractAlterTableAnalyzer {
       TimestampTZ time = TimestampTZUtil.parse(PlanUtils.stripQuotes(child.getText()), timeZone);
       spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli()));
       desc = new AlterTableExecuteDesc(tableName, partitionSpec, spec);
+    } else if (HiveParser.KW_SET_CURRENT_SNAPSHOT == executeCommandType.getType()) {
+      ASTNode child = (ASTNode) command.getChild(1);
+      AlterTableExecuteSpec<AlterTableExecuteSpec.SetCurrentSnapshotSpec> spec =
+          new AlterTableExecuteSpec(SET_CURRENT_SNAPSHOT,
+              new AlterTableExecuteSpec.SetCurrentSnapshotSpec(Long.valueOf(child.getText())));
+      desc = new AlterTableExecuteDesc(tableName, partitionSpec, spec);
     }
 
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
index d1bb59f5658..3f5a2099f8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
@@ -33,7 +33,8 @@ public class AlterTableExecuteSpec<T> {
 
   public enum ExecuteOperationType {
     ROLLBACK,
-    EXPIRE_SNAPSHOT
+    EXPIRE_SNAPSHOT,
+    SET_CURRENT_SNAPSHOT
   }
 
   private final ExecuteOperationType operationType;
@@ -116,4 +117,27 @@ public class AlterTableExecuteSpec<T> {
       return MoreObjects.toStringHelper(this).add("timestampMillis", timestampMillis).toString();
     }
   }
+
+  /**
+   * Value object class, that stores the set snapshot version operation specific parameters
+   * <ul>
+   *   <li>snapshot Id: it should be a valid snapshot version</li>
+   * </ul>
+   */
+  public static class SetCurrentSnapshotSpec {
+    private final long snapshotId;
+
+    public SetCurrentSnapshotSpec(Long snapshotId) {
+      this.snapshotId = snapshotId;
+    }
+
+    public Long getSnapshotId() {
+      return snapshotId;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this).add("snapshotId", snapshotId).toString();
+    }
+  }
 }