You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/01/02 18:15:34 UTC

[hive] branch master updated: HIVE-26868: Iceberg: Allow IOW on empty table with Partition Evolution. (#3872). (Ayush Saxena, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 14ccddee751 HIVE-26868: Iceberg: Allow IOW on empty table with Partition Evolution. (#3872). (Ayush Saxena, reviewed by Denys Kuzmenko)
14ccddee751 is described below

commit 14ccddee75178ff560a44c24a8f2b7b4766dede0
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Mon Jan 2 23:45:21 2023 +0530

    HIVE-26868: Iceberg: Allow IOW on empty table with Partition Evolution. (#3872). (Ayush Saxena, reviewed by Denys Kuzmenko)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |   7 +-
 .../iceberg/mr/hive/TestHiveIcebergInserts.java    | 103 +++++++++++++++++++++
 2 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 2241ded4719..4b225e4719b 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -582,6 +582,11 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     HiveStorageHandler.super.validateSinkDesc(sinkDesc);
     if (sinkDesc.getInsertOverwrite()) {
       Table table = IcebergTableUtil.getTable(conf, sinkDesc.getTableInfo().getProperties());
+      if (table.currentSnapshot() != null &&
+          Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.TOTAL_RECORDS_PROP)) == 0) {
+        // If the table is empty we don't have any danger that some data can get lost.
+        return;
+      }
       if (IcebergTableUtil.isBucketed(table)) {
         throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table.");
       }
@@ -590,7 +595,7 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
             .anyMatch(id -> id < table.spec().specId())) {
           throw new SemanticException(
               "Cannot perform insert overwrite query on Iceberg table where partition evolution happened. In order " +
-              "to succesfully carry out any insert overwrite operation on this table, the data has to be rewritten " +
+              "to successfully carry out any insert overwrite operation on this table, the data has to be rewritten " +
               "conforming to the latest spec. ");
         }
       }
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 675677be8a5..3fd2c240fe7 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.mr.TestHelper;
@@ -504,4 +507,104 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB
     shell.executeStatement("INSERT INTO target SELECT * FROM source WHERE first_name = 'Nobody'");
     HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
   }
+
+  @Test
+  public void testInsertOverwriteOnEmptyV1Table() throws IOException {
+    TableIdentifier target = TableIdentifier.of("default", "target");
+    Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    // Insert some data.
+    List<Record> newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(0L, "ABC", "DBAK")
+        .add(1L, "XYZ", "RDBS")
+        .build();
+    shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+    // Change the partition and then insert some more data.
+    shell.executeStatement("ALTER TABLE target SET PARTITION SPEC(TRUNCATE(2, last_name))");
+    newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(2L, "Mike", "Taylor")
+        .add(3L, "Christy", "Hubert")
+        .build();
+
+    shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+    // Truncate the table
+    shell.executeStatement("TRUNCATE TABLE target");
+
+    newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(0L, "Mike", "Taylor")
+        .add(3L, "ABC", "DBAK")
+        .add(4L, "APL", "DBAM")
+        .build();
+
+    // Do an insert-overwrite and this should be successful because the table is empty.
+    shell.executeStatement(testTables.getInsertQuery(newRecords, target, true));
+
+    HiveIcebergTestUtils.validateData(table, newRecords, 0);
+  }
+
+  @Test
+  public void testInsertOverwriteOnEmptyV2Table() throws IOException {
+    // Create a V2 table with merge-on-read.
+    TableIdentifier target = TableIdentifier.of("default", "target");
+    Map<String, String> opTypes = ImmutableMap.of(
+        TableProperties.DELETE_MODE, "merge-on-read",
+        TableProperties.MERGE_MODE, "merge-on-read",
+        TableProperties.UPDATE_MODE, "merge-on-read");
+
+    Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2, opTypes);
+
+    // Insert some data.
+    List<Record> newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(3L, "ABC", "DBAK")
+        .add(4L, "XYZ", "RDBS")
+        .add(5L, "CBO", "HIVE")
+        .add(6L, "HADOOP", "HDFS")
+        .build();
+    shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+    // Perform some deletes & updates.
+    shell.executeStatement("update target set first_name='WXYZ' where customer_id=1");
+    shell.executeStatement("delete from target where customer_id%2=0");
+
+    // Change the partition and then insert some more data.
+    shell.executeStatement("ALTER TABLE target SET PARTITION SPEC(TRUNCATE(2, last_name))");
+    newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(7L, "Mike", "Taylor")
+        .add(8L, "Christy", "Hubert")
+        .add(9L, "RDBMS", "Talk")
+        .add(10L, "Notification", "Hub")
+        .add(11L, "Vector", "HDFS")
+        .build();
+    shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+    // Perform some deletes & updates.
+    shell.executeStatement("update target set first_name='RDBMSV2' where customer_id=9");
+    shell.executeStatement("delete from target where customer_id%2=0 AND customer_id>6");
+
+    Table icebergTable = testTables.loadTable(target);
+    // There should be some delete files, due to our delete & update operations.
+    Assert.assertNotEquals("0", icebergTable.currentSnapshot().summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
+
+    long preTruncateSnapshotId = icebergTable.currentSnapshot().snapshotId();
+    List<Object[]> result =
+        shell.executeStatement(String.format("SELECT * FROM %s order by customer_id", target.name()));
+
+    // Truncate the table
+    shell.executeStatement("TRUNCATE TABLE target");
+
+    // Do an insert-overwrite and this should be successful because the table is empty.
+    shell.executeStatement(
+        "INSERT OVERWRITE TABLE target select * from target FOR SYSTEM_VERSION AS OF " + preTruncateSnapshotId);
+
+    HiveIcebergTestUtils.validateData(table,
+        HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result), 0);
+    icebergTable = testTables.loadTable(target);
+
+    // There should be no delete files, they should have been merged with the data files
+    Assert.assertEquals("0", icebergTable.currentSnapshot().summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
+  }
 }