You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2023/04/27 14:05:47 UTC

[hive] branch master updated: HIVE-27197: Iceberg:Support Iceberg version travel by reference name (Butao Zhang, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d01b1860c42 HIVE-27197: Iceberg:Support Iceberg version travel by reference name (Butao Zhang, reviewed by Denys Kuzmenko)
d01b1860c42 is described below

commit d01b1860c42f3d61009e1c23d4947bc74138ad0f
Author: Butao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Thu Apr 27 22:05:41 2023 +0800

    HIVE-27197: Iceberg:Support Iceberg version travel by reference name (Butao Zhang, reviewed by Denys Kuzmenko)
    
    Closes #4173
---
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 13 +++++++++-
 .../iceberg/mr/hive/TestHiveIcebergTimeTravel.java | 30 ++++++++++++++++++++++
 .../apache/hadoop/hive/ql/parse/FromClauseParser.g |  2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 30 ++++++++++++----------
 4 files changed, 60 insertions(+), 15 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 4ad91ea6858..3e32c11e6d3 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.Scan;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -114,7 +115,17 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
   private static TableScan createTableScan(Table table, Configuration conf) {
     TableScan scan = table.newScan();
 
-    long snapshotId = conf.getLong(InputFormatConfig.SNAPSHOT_ID, -1);
+    long snapshotId = -1;
+    try {
+      snapshotId = conf.getLong(InputFormatConfig.SNAPSHOT_ID, -1);
+    } catch (NumberFormatException e) {
+      String version = conf.get(InputFormatConfig.SNAPSHOT_ID);
+      SnapshotRef ref = table.refs().get(version);
+      if (ref == null) {
+        throw new RuntimeException("Cannot find matching snapshot ID or reference name for version " + version);
+      }
+      snapshotId = ref.snapshotId();
+    }
     if (snapshotId != -1) {
       scan = scan.useSnapshot(snapshotId);
     }
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
index be865817d13..7c64eb68c93 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
@@ -87,6 +87,36 @@ public class TestHiveIcebergTimeTravel extends HiveIcebergStorageHandlerWithEngi
     }
   }
 
+  @Test
+  public void testSelectAsOfBranchReference() throws IOException, InterruptedException {
+    Table table = testTables.createTableWithVersions(shell, "customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
+
+    long firstSnapshotId = table.history().get(0).snapshotId();
+    table.manageSnapshots().createBranch("main_branch", firstSnapshotId).commit();
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 'main_branch'");
+
+    Assert.assertEquals(3, rows.size());
+
+    long secondSnapshotId = table.history().get(1).snapshotId();
+    table.manageSnapshots().createBranch("test_branch", secondSnapshotId).commit();
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 'test_branch'");
+
+    Assert.assertEquals(4, rows.size());
+
+    try {
+      shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 'unknown_branch'");
+    } catch (Throwable e) {
+      while (e.getCause() != null) {
+        e = e.getCause();
+      }
+      Assert.assertTrue(e.getMessage().contains("Cannot find matching snapshot ID or reference name for " +
+          "version unknown_branch"));
+    }
+  }
+
   @Test
   public void testCTASAsOfVersionAndTimestamp() throws IOException, InterruptedException {
     Table table = testTables.createTableWithVersions(shell, "customers",
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
index abeb38305dc..c1dc2224274 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
@@ -220,7 +220,7 @@ asOfClause
     (KW_FOR KW_SYSTEM_TIME KW_AS KW_OF asOfTime=expression)
     -> ^(TOK_AS_OF_TIME $asOfTime)
     |
-    (KW_FOR KW_SYSTEM_VERSION KW_AS KW_OF asOfVersion=Number)
+    (KW_FOR KW_SYSTEM_VERSION KW_AS KW_OF asOfVersion=expression)
     -> ^(TOK_AS_OF_VERSION $asOfVersion)
     ;
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index e68c76f4043..fb9ae347ed0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -1131,21 +1131,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     if (asOfTimeIndex != -1 || asOfVersionIndex != -1 || asOfVersionFromIndex != -1) {
-      String asOfVersion = asOfVersionIndex == -1 ? null : tabref.getChild(asOfVersionIndex).getChild(0).getText();
+      String asOfVersion = asOfVersionIndex == -1 ? null : getAsOfValue(tabref, asOfVersionIndex);
       String asOfVersionFrom =
           asOfVersionFromIndex == -1 ? null : tabref.getChild(asOfVersionFromIndex).getChild(0).getText();
-      String asOfTime = null;
-      
-      if (asOfTimeIndex != -1) {
-        ASTNode expr = (ASTNode) tabref.getChild(asOfTimeIndex).getChild(0);
-        if (expr.getChildCount() > 0) {
-          ExprNodeDesc desc = genExprNodeDesc(expr, new RowResolver(), false, true);
-          ExprNodeConstantDesc c = (ExprNodeConstantDesc) desc;
-          asOfTime = String.valueOf(c.getValue());
-        } else {
-          asOfTime = stripQuotes(expr.getText());
-        }
-      }
+      String asOfTime = asOfTimeIndex == -1 ? null : getAsOfValue(tabref, asOfTimeIndex);
       qb.setSystemVersion(alias, new QBSystemVersion(asOfVersion, asOfVersionFrom, asOfTime));
     }
 
@@ -1240,6 +1229,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return alias;
   }
 
+  private String getAsOfValue(ASTNode tabref, int asOfIndex) throws SemanticException {
+    String asOfValue = null;
+    if (asOfIndex != -1) {
+      ASTNode expr = (ASTNode) tabref.getChild(asOfIndex).getChild(0);
+      if (expr.getChildCount() > 0) {
+        ExprNodeDesc desc = genExprNodeDesc(expr, new RowResolver(), false, true);
+        ExprNodeConstantDesc c = (ExprNodeConstantDesc) desc;
+        asOfValue = String.valueOf(c.getValue());
+      } else {
+        asOfValue = stripQuotes(expr.getText());
+      }
+    }
+    return asOfValue;
+  }
+
   Map<String, SplitSample> getNameToSplitSampleMap() {
     return this.nameToSplitSample;
   }