You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/07/27 11:15:27 UTC

[GitHub] [hive] marton-bod commented on a change in pull request #2512: HIVE-25344: Add a possibility to query Iceberg table snapshots based on the timestamp or the snapshot id

marton-bod commented on a change in pull request #2512:
URL: https://github.com/apache/hive/pull/2512#discussion_r677298347



##########
File path: common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
##########
@@ -474,6 +474,7 @@
   NULL_TREATMENT_NOT_SUPPORTED(10426, "Function {0} does not support null treatment.", true),
   DATACONNECTOR_ALREADY_EXISTS(10427, "Dataconnector {0} already exists", true),
   DATACONNECTOR_NOT_EXISTS(10428, "Dataconnector does not exist:"),
+  TIME_TRAVEL_NOT_ALLOWED(10429, "Time travel is not allowed for {0}. Please chose a storage format which supports the feature.", true),

Review comment:
       typo: choose

##########
File path: iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());
+  }
+
+  /**
+   * Creates the 'customers' table with the default records and creates extra snapshots by inserting one more line
+   * into the table.
+   * @param versions The number of snapshots we want to create

Review comment:
       nit: don't we create a snapshot for the initial table creation? (so we end up with versions + 1 in the end?)

##########
File path: service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
##########
@@ -494,7 +494,7 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows)
       }
       return rowSet;
     } catch (Exception e) {
-      throw new HiveSQLException("Unable to get the next row set", e);
+      throw new HiveSQLException("Unable to get the next row set with exception: " + e.getMessage(), e);

Review comment:
       This is very much needed thanks! :)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
##########
@@ -1261,4 +1275,19 @@ public boolean equalsWithIgnoreWriteId(Table tbl ) {
     return result;
   }
 
+  public long getAsOfVersion() {
+    return asOfVersion;
+  }
+
+  public void setAsOfVersion(long asOfVersion) {

Review comment:
       nit: not that I want to complicate our own life, but shall we consider making the version field a string to be more generic, in case other storage handlers would implement this in the future, e.g. with git-like hash versions? Then we could accept any string and simply convert it to a long, if it's an Iceberg query. If we set in stone now that the version must be a number on the syntax level, then it'd become a breaking change later to amend that. But it's just food for thought, again, don't want to complicate life either

##########
File path: iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,

Review comment:
       What happens if we provide a timestamp in the far future? Will it default to the current snapshot? 

##########
File path: iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());
+  }
+
+  /**
+   * Creates the 'customers' table with the default records and creates extra snapshots by inserting one more line
+   * into the table.
+   * @param versions The number of snapshots we want to create
+   * @return The table created
+   * @throws IOException When there is a problem during table creation
+   * @throws InterruptedException When there is a problem during adding new data to the table
+   */
+  private Table prepareTableWithVersions(int versions) throws IOException, InterruptedException {
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    for (int i = 0; i < versions - 1; ++i) {
+      // Just wait a little so we definitely will not have the same timestamp for the snapshots
+      Thread.sleep(100);
+      shell.executeStatement("INSERT INTO customers values(" +
+          (i + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size()) + ",'Alice','Green_" + i + "')");
+    }
+
+    table.refresh();
+
+    return table;
+  }
+
+  /**
+   * Get the timestamp string which we can use in the queries. The timestamp will be after the given snapshot
+   * and before the next one
+   * @param table The table which we want to query
+   * @param snapshotPosition The position of the last snapshot we want to see in the query results
+   * @return The timestamp which we can use in the queries
+   */
+  private String timestampAfterSnapshot(Table table, int snapshotPosition) {
+    List<HistoryEntry> history = table.history();
+    long snapshotTime = history.get(snapshotPosition).timestampMillis();
+    long time = snapshotTime + 100;
+    if (history.size() > snapshotPosition + 1) {
+      time = snapshotTime + ((history.get(snapshotPosition + 1).timestampMillis() - snapshotTime) / 2);
+    }
+
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000");

Review comment:
       Shall we factor this into a constant somewhere, or have a util method for this conversion? I think we'll need this for the metadata table output too

##########
File path: iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());

Review comment:
       Can we add one more test case, where we query one table in the join with timestamp and the other with version?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
##########
@@ -2223,6 +2237,22 @@ private void getMetaData(QB qb, ReadEntity parentInput)
         }
       }
 
+      Pair<String, String> asOf = qb.getAsOfForAlias(alias);
+      if (asOf != null) {
+        if (!Optional.ofNullable(tab.getStorageHandler()).map(HiveStorageHandler::isTimeTravelAllowed).orElse(false)) {
+          throw new SemanticException(ErrorMsg.TIME_TRAVEL_NOT_ALLOWED, alias);
+        }
+        if (asOf.getLeft() != null) {
+          tab.setAsOfVersion(Long.parseLong(asOf.getLeft()));
+        }
+        if (asOf.getRight() != null) {
+          ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() :
+                                 SessionState.get().getConf().getLocalTimeZone();
+          TimestampTZ ts = TimestampTZUtil.parse(stripQuotes(asOf.getRight()), timeZone);

Review comment:
       So this means that the timestamp the user passes into the query will be considered as a timestamp in the user's local time zone (as opposed to a UTC timezone)?

##########
File path: iceberg/iceberg-handler/src/test/queries/negative/timetravel_by_time_non_iceberg.q
##########
@@ -0,0 +1,2 @@
+create table tbl_orc (a int, b string);

Review comment:
       Do we positive qtests too for Iceberg tables?

##########
File path: iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) + "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());
+  }
+
+  /**
+   * Creates the 'customers' table with the default records and creates extra snapshots by inserting one more line
+   * into the table.
+   * @param versions The number of snapshots we want to create
+   * @return The table created
+   * @throws IOException When there is a problem during table creation
+   * @throws InterruptedException When there is a problem during adding new data to the table
+   */
+  private Table prepareTableWithVersions(int versions) throws IOException, InterruptedException {
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    for (int i = 0; i < versions - 1; ++i) {
+      // Just wait a little so we definitely will not have the same timestamp for the snapshots
+      Thread.sleep(100);
+      shell.executeStatement("INSERT INTO customers values(" +
+          (i + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size()) + ",'Alice','Green_" + i + "')");
+    }
+
+    table.refresh();
+
+    return table;
+  }
+
+  /**
+   * Get the timestamp string which we can use in the queries. The timestamp will be after the given snapshot
+   * and before the next one
+   * @param table The table which we want to query
+   * @param snapshotPosition The position of the last snapshot we want to see in the query results
+   * @return The timestamp which we can use in the queries
+   */
+  private String timestampAfterSnapshot(Table table, int snapshotPosition) {
+    List<HistoryEntry> history = table.history();
+    long snapshotTime = history.get(snapshotPosition).timestampMillis();
+    long time = snapshotTime + 100;
+    if (history.size() > snapshotPosition + 1) {
+      time = snapshotTime + ((history.get(snapshotPosition + 1).timestampMillis() - snapshotTime) / 2);
+    }
+
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000");

Review comment:
       Also, maybe one more test where we insert into a table using time travel? Not sure it has added value, but I can imagine CTAS being used w/ time travel frequently




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org