You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/04/28 07:45:57 UTC
[iceberg] branch master updated: Hive: fix issue of inserting empty
data on Tez (#2516)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 90bc329 Hive: fix issue of inserting empty data on Tez (#2516)
90bc329 is described below
commit 90bc3297fbde62150281698ad5b61dc0dfb2be9c
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Wed Apr 28 09:45:09 2021 +0200
Hive: fix issue of inserting empty data on Tez (#2516)
---
.../mr/hive/HiveIcebergOutputCommitter.java | 29 ++++++++++++++++------
.../TestHiveIcebergStorageHandlerWithEngine.java | 16 ++++++++++++
2 files changed, 38 insertions(+), 7 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index a7d7891..a6ea8e2 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -25,6 +25,7 @@ import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@@ -50,6 +51,7 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
@@ -92,8 +94,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptID attemptID = context.getTaskAttemptID();
JobConf jobConf = context.getJobConf();
- Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.getWriters(attemptID);
Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
+ Map<String, HiveIcebergRecordWriter> writers = Optional.ofNullable(HiveIcebergRecordWriter.getWriters(attemptID))
+ .orElseGet(() -> {
+ LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
+ return ImmutableMap.of();
+ });
ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
try {
@@ -107,11 +113,16 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
if (table != null) {
HiveIcebergRecordWriter writer = writers.get(output);
- DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
+ DataFile[] closedFiles;
+ if (writer != null) {
+ closedFiles = writer.dataFiles();
+ } else {
+ LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
+ closedFiles = new DataFile[0];
+ }
+ // Creating the file containing the data files generated by this task for this table
String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
attemptID.getJobID(), attemptID.getTaskID().getId());
-
- // Creating the file containing the data files generated by this task for this table
createFileForCommit(closedFiles, fileForCommitLocation, table.io());
} else {
// When using Tez multi-table inserts, we could have more output tables in config than
@@ -176,9 +187,13 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
.executeWith(tableExecutor)
.run(output -> {
Table table = HiveIcebergStorageHandler.table(jobConf, output);
- String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
- jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
- commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
+ if (table != null) {
+ String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
+ jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
+ commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
+ } else {
+ LOG.info("CommitJob found no serialized table in config for table: {}. Skipping job commit.", output);
+ }
});
} finally {
fileExecutor.shutdown();
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 972fcf5..48bd181 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -705,6 +705,22 @@ public class TestHiveIcebergStorageHandlerWithEngine {
Assert.assertEquals("Linda", results.get(0)[1]);
}
+ @Test
+ public void testInsertEmptyResultSet() throws IOException {
+ Table source = testTables.createTable(shell, "source", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, ImmutableList.of());
+ Table target = testTables.createTable(shell, "target", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, ImmutableList.of());
+
+ shell.executeStatement("INSERT INTO target SELECT * FROM source");
+ HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+
+ testTables.appendIcebergTable(shell.getHiveConf(), source, fileFormat, null,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ shell.executeStatement("INSERT INTO target SELECT * FROM source WHERE first_name = 'Nobody'");
+ HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+ }
+
private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
String tableName = "complex_table";
Table table = testTables.createTable(shell, "complex_table", schema, fileFormat, ImmutableList.of());