You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2022/04/14 13:22:49 UTC

[hive] branch master updated: HIVE-26133: Insert overwrite on Iceberg tables can result in duplicate entries after partition evolution (#3202) (Laszlo Pinter, reviewed by Marton Bod and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a7ea25a72e HIVE-26133: Insert overwrite on Iceberg tables can result in duplicate entries after partition evolution (#3202) (Laszlo Pinter, reviewed by Marton Bod and Peter Vary)
a7ea25a72e is described below

commit a7ea25a72ec5334d3cac15f503b651de8200ff9c
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Thu Apr 14 15:22:42 2022 +0200

    HIVE-26133: Insert overwrite on Iceberg tables can result in duplicate entries after partition evolution (#3202) (Laszlo Pinter, reviewed by Marton Bod and Peter Vary)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java     | 10 ++++++++++
 .../apache/iceberg/mr/hive/TestHiveIcebergInserts.java | 18 ++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index e68458eafe..4c82eb78cd 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
@@ -460,6 +461,15 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
       if (IcebergTableUtil.isBucketed(table)) {
         throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table.");
       }
+      if (table.currentSnapshot() != null) {
+        if (table.currentSnapshot().allManifests().parallelStream().map(ManifestFile::partitionSpecId)
+            .anyMatch(id -> id < table.spec().specId())) {
+          throw new SemanticException(
+              "Cannot perform insert overwrite query on Iceberg table where partition evolution happened. In order " +
+              "to succesfully carry out any insert overwrite operation on this table, the data has to be rewritten " +
+              "conforming to the latest spec. ");
+        }
+      }
     }
   }
 
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 8545447cd2..f38eea1969 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -183,6 +183,24 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB
             testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, target, true)));
   }
 
+  @Test
+  public void testInsertOverwriteWithPartitionEvolutionThrowsError() throws IOException {
+    TableIdentifier target = TableIdentifier.of("default", "target");
+    Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    shell.executeStatement("ALTER TABLE target SET PARTITION SPEC(TRUNCATE(2, last_name))");
+    List<Record> newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(0L, "Mike", "Taylor")
+        .add(1L, "Christy", "Hubert")
+        .build();
+    AssertHelpers.assertThrows("IOW should not work on tables with partition evolution",
+        IllegalArgumentException.class,
+        "Cannot perform insert overwrite query on Iceberg table where partition evolution happened.",
+        () -> shell.executeStatement(testTables.getInsertQuery(newRecords, target, true)));
+    // TODO: we should add additional test cases after merge + compaction is supported in hive that allows us to
+    // rewrite the data
+  }
+
   /**
    * Testing map-reduce inserts.
    * @throws IOException If there is an underlying IOException