You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/04/28 07:45:57 UTC

[iceberg] branch master updated: Hive: fix issue of inserting empty data on Tez (#2516)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 90bc329  Hive: fix issue of inserting empty data on Tez (#2516)
90bc329 is described below

commit 90bc3297fbde62150281698ad5b61dc0dfb2be9c
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Wed Apr 28 09:45:09 2021 +0200

    Hive: fix issue of inserting empty data on Tez (#2516)
---
 .../mr/hive/HiveIcebergOutputCommitter.java        | 29 ++++++++++++++++------
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 16 ++++++++++++
 2 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index a7d7891..a6ea8e2 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -25,6 +25,7 @@ import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -50,6 +51,7 @@ import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
@@ -92,8 +94,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
 
     TaskAttemptID attemptID = context.getTaskAttemptID();
     JobConf jobConf = context.getJobConf();
-    Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.getWriters(attemptID);
     Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
+    Map<String, HiveIcebergRecordWriter> writers = Optional.ofNullable(HiveIcebergRecordWriter.getWriters(attemptID))
+        .orElseGet(() -> {
+          LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
+          return ImmutableMap.of();
+        });
 
     ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
     try {
@@ -107,11 +113,16 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
             Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
             if (table != null) {
               HiveIcebergRecordWriter writer = writers.get(output);
-              DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
+              DataFile[] closedFiles;
+              if (writer != null) {
+                closedFiles = writer.dataFiles();
+              } else {
+                LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
+                closedFiles = new DataFile[0];
+              }
+                // Creating the file containing the data files generated by this task for this table
               String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
                       attemptID.getJobID(), attemptID.getTaskID().getId());
-
-              // Creating the file containing the data files generated by this task for this table
               createFileForCommit(closedFiles, fileForCommitLocation, table.io());
             } else {
               // When using Tez multi-table inserts, we could have more output tables in config than
@@ -176,9 +187,13 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
           .executeWith(tableExecutor)
           .run(output -> {
             Table table = HiveIcebergStorageHandler.table(jobConf, output);
-            String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
-            jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
-            commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
+            if (table != null) {
+              String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
+              jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
+              commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
+            } else {
+              LOG.info("CommitJob found no serialized table in config for table: {}. Skipping job commit.", output);
+            }
           });
     } finally {
       fileExecutor.shutdown();
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 972fcf5..48bd181 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -705,6 +705,22 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     Assert.assertEquals("Linda", results.get(0)[1]);
   }
 
+  @Test
+  public void testInsertEmptyResultSet() throws IOException {
+    Table source = testTables.createTable(shell, "source", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            fileFormat, ImmutableList.of());
+    Table target = testTables.createTable(shell, "target", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            fileFormat, ImmutableList.of());
+
+    shell.executeStatement("INSERT INTO target SELECT * FROM source");
+    HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+
+    testTables.appendIcebergTable(shell.getHiveConf(), source, fileFormat, null,
+            HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    shell.executeStatement("INSERT INTO target SELECT * FROM source WHERE first_name = 'Nobody'");
+    HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+  }
+
   private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
     String tableName = "complex_table";
     Table table = testTables.createTable(shell, "complex_table", schema, fileFormat, ImmutableList.of());