You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/01/02 18:15:34 UTC
[hive] branch master updated: HIVE-26868: Iceberg: Allow IOW on empty table with Partition Evolution. (#3872). (Ayush Saxena, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 14ccddee751 HIVE-26868: Iceberg: Allow IOW on empty table with Partition Evolution. (#3872). (Ayush Saxena, reviewed by Denys Kuzmenko)
14ccddee751 is described below
commit 14ccddee75178ff560a44c24a8f2b7b4766dede0
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Mon Jan 2 23:45:21 2023 +0530
HIVE-26868: Iceberg: Allow IOW on empty table with Partition Evolution. (#3872). (Ayush Saxena, reviewed by Denys Kuzmenko)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 7 +-
.../iceberg/mr/hive/TestHiveIcebergInserts.java | 103 +++++++++++++++++++++
2 files changed, 109 insertions(+), 1 deletion(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 2241ded4719..4b225e4719b 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -582,6 +582,11 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
HiveStorageHandler.super.validateSinkDesc(sinkDesc);
if (sinkDesc.getInsertOverwrite()) {
Table table = IcebergTableUtil.getTable(conf, sinkDesc.getTableInfo().getProperties());
+ if (table.currentSnapshot() != null &&
+ Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.TOTAL_RECORDS_PROP)) == 0) {
+ // If the table is empty we don't have any danger that some data can get lost.
+ return;
+ }
if (IcebergTableUtil.isBucketed(table)) {
throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table.");
}
@@ -590,7 +595,7 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
.anyMatch(id -> id < table.spec().specId())) {
throw new SemanticException(
"Cannot perform insert overwrite query on Iceberg table where partition evolution happened. In order " +
- "to succesfully carry out any insert overwrite operation on this table, the data has to be rewritten " +
+ "to successfully carry out any insert overwrite operation on this table, the data has to be rewritten " +
"conforming to the latest spec. ");
}
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 675677be8a5..3fd2c240fe7 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.TestHelper;
@@ -504,4 +507,104 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB
shell.executeStatement("INSERT INTO target SELECT * FROM source WHERE first_name = 'Nobody'");
HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
}
+
+ @Test
+ public void testInsertOverwriteOnEmptyV1Table() throws IOException {
+ TableIdentifier target = TableIdentifier.of("default", "target");
+ Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+ // Insert some data.
+ List<Record> newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(0L, "ABC", "DBAK")
+ .add(1L, "XYZ", "RDBS")
+ .build();
+ shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+ // Change the partition and then insert some more data.
+ shell.executeStatement("ALTER TABLE target SET PARTITION SPEC(TRUNCATE(2, last_name))");
+ newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(2L, "Mike", "Taylor")
+ .add(3L, "Christy", "Hubert")
+ .build();
+
+ shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+ // Truncate the table
+ shell.executeStatement("TRUNCATE TABLE target");
+
+ newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(0L, "Mike", "Taylor")
+ .add(3L, "ABC", "DBAK")
+ .add(4L, "APL", "DBAM")
+ .build();
+
+ // Do an insert-overwrite and this should be successful because the table is empty.
+ shell.executeStatement(testTables.getInsertQuery(newRecords, target, true));
+
+ HiveIcebergTestUtils.validateData(table, newRecords, 0);
+ }
+
+ @Test
+ public void testInsertOverwriteOnEmptyV2Table() throws IOException {
+ // Create a V2 table with merge-on-read.
+ TableIdentifier target = TableIdentifier.of("default", "target");
+ Map<String, String> opTypes = ImmutableMap.of(
+ TableProperties.DELETE_MODE, "merge-on-read",
+ TableProperties.MERGE_MODE, "merge-on-read",
+ TableProperties.UPDATE_MODE, "merge-on-read");
+
+ Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2, opTypes);
+
+ // Insert some data.
+ List<Record> newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(3L, "ABC", "DBAK")
+ .add(4L, "XYZ", "RDBS")
+ .add(5L, "CBO", "HIVE")
+ .add(6L, "HADOOP", "HDFS")
+ .build();
+ shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+ // Perform some deletes & updates.
+ shell.executeStatement("update target set first_name='WXYZ' where customer_id=1");
+ shell.executeStatement("delete from target where customer_id%2=0");
+
+ // Change the partition and then insert some more data.
+ shell.executeStatement("ALTER TABLE target SET PARTITION SPEC(TRUNCATE(2, last_name))");
+ newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(7L, "Mike", "Taylor")
+ .add(8L, "Christy", "Hubert")
+ .add(9L, "RDBMS", "Talk")
+ .add(10L, "Notification", "Hub")
+ .add(11L, "Vector", "HDFS")
+ .build();
+ shell.executeStatement(testTables.getInsertQuery(newRecords, target, false));
+
+ // Perform some deletes & updates.
+ shell.executeStatement("update target set first_name='RDBMSV2' where customer_id=9");
+ shell.executeStatement("delete from target where customer_id%2=0 AND customer_id>6");
+
+ Table icebergTable = testTables.loadTable(target);
+ // There should be some delete files, due to our delete & update operations.
+ Assert.assertNotEquals("0", icebergTable.currentSnapshot().summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
+
+ long preTruncateSnapshotId = icebergTable.currentSnapshot().snapshotId();
+ List<Object[]> result =
+ shell.executeStatement(String.format("SELECT * FROM %s order by customer_id", target.name()));
+
+ // Truncate the table
+ shell.executeStatement("TRUNCATE TABLE target");
+
+ // Do an insert-overwrite and this should be successful because the table is empty.
+ shell.executeStatement(
+ "INSERT OVERWRITE TABLE target select * from target FOR SYSTEM_VERSION AS OF " + preTruncateSnapshotId);
+
+ HiveIcebergTestUtils.validateData(table,
+ HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result), 0);
+ icebergTable = testTables.loadTable(target);
+
+ // There should be no delete files, they should have been merged with the data files
+ Assert.assertEquals("0", icebergTable.currentSnapshot().summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
+ }
}