You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/04/26 15:00:23 UTC

[iceberg] branch master updated: Hive: Fix multi-table insert issue with Tez (#2502)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f271bd  Hive: Fix multi-table insert issue with Tez (#2502)
4f271bd is described below

commit 4f271bd3a4478ec802fe256e512635aea576cf0e
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Mon Apr 26 17:00:11 2021 +0200

    Hive: Fix multi-table insert issue with Tez (#2502)
---
 .../iceberg/mr/hive/HiveIcebergOutputCommitter.java  | 20 +++++++++++++-------
 .../TestHiveIcebergStorageHandlerWithEngine.java     | 18 ++++++++++++++++--
 .../java/org/apache/iceberg/mr/hive/TestTables.java  |  9 +++++++++
 3 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index bf298ad..a7d7891 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -105,13 +105,19 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
           .executeWith(tableExecutor)
           .run(output -> {
             Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
-            HiveIcebergRecordWriter writer = writers.get(output);
-            DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
-            String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
-                attemptID.getJobID(), attemptID.getTaskID().getId());
-
-            // Creating the file containing the data files generated by this task for this table
-            createFileForCommit(closedFiles, fileForCommitLocation, table.io());
+            if (table != null) {
+              HiveIcebergRecordWriter writer = writers.get(output);
+              DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
+              String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
+                      attemptID.getJobID(), attemptID.getTaskID().getId());
+
+              // Creating the file containing the data files generated by this task for this table
+              createFileForCommit(closedFiles, fileForCommitLocation, table.io());
+            } else {
+              // When using Tez multi-table inserts, we could have more output tables in config than
+              // the actual tables this task has written to and has serialized in its config
+              LOG.info("CommitTask found no serialized table in config for table: {}.", output);
+            }
           }, IOException.class);
     } finally {
       if (tableExecutor != null) {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index aca5c04..972fcf5 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -659,9 +659,23 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     Table target1 = testTables.createTable(shell, "target1", target1Schema, fileFormat, ImmutableList.of());
     Table target2 = testTables.createTable(shell, "target2", target2Schema, fileFormat, ImmutableList.of());
 
+    // simple insert: should create a single vertex writing to both target tables
     shell.executeStatement("FROM customers " +
-            "INSERT INTO target1 SELECT customer_id, first_name " +
-            "INSERT INTO target2 SELECT last_name, customer_id");
+        "INSERT INTO target1 SELECT customer_id, first_name " +
+        "INSERT INTO target2 SELECT last_name, customer_id");
+
+    // Check that everything is as expected
+    HiveIcebergTestUtils.validateData(target1, target1Records, 0);
+    HiveIcebergTestUtils.validateData(target2, target2Records, 1);
+
+    // truncate the target tables
+    testTables.truncateIcebergTable(target1);
+    testTables.truncateIcebergTable(target2);
+
+    // complex insert: should use a different vertex for each target table
+    shell.executeStatement("FROM customers " +
+        "INSERT INTO target1 SELECT customer_id, first_name ORDER BY first_name " +
+        "INSERT INTO target2 SELECT last_name, customer_id ORDER BY last_name");
 
     // Check that everything is as expected
     HiveIcebergTestUtils.validateData(target1, target1Records, 0);
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4d3a60e..c282360 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.Tables;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hive.HiveCatalog;
@@ -275,6 +276,14 @@ abstract class TestTables {
     }
   }
 
+  /**
+   * Truncates an Iceberg table.
+   * @param table The iceberg table to truncate
+   */
+  public void truncateIcebergTable(Table table) {
+    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+  }
+
   private static class CatalogToTables implements Tables {
 
     private final Catalog catalog;