You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/04/26 15:00:23 UTC
[iceberg] branch master updated: Hive: Fix multi-table insert issue
with Tez (#2502)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4f271bd Hive: Fix multi-table insert issue with Tez (#2502)
4f271bd is described below
commit 4f271bd3a4478ec802fe256e512635aea576cf0e
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Mon Apr 26 17:00:11 2021 +0200
Hive: Fix multi-table insert issue with Tez (#2502)
---
.../iceberg/mr/hive/HiveIcebergOutputCommitter.java | 20 +++++++++++++-------
.../TestHiveIcebergStorageHandlerWithEngine.java | 18 ++++++++++++++++--
.../java/org/apache/iceberg/mr/hive/TestTables.java | 9 +++++++++
3 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index bf298ad..a7d7891 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -105,13 +105,19 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
.executeWith(tableExecutor)
.run(output -> {
Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
- HiveIcebergRecordWriter writer = writers.get(output);
- DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
- String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
- attemptID.getJobID(), attemptID.getTaskID().getId());
-
- // Creating the file containing the data files generated by this task for this table
- createFileForCommit(closedFiles, fileForCommitLocation, table.io());
+ if (table != null) {
+ HiveIcebergRecordWriter writer = writers.get(output);
+ DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
+ String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
+ attemptID.getJobID(), attemptID.getTaskID().getId());
+
+ // Creating the file containing the data files generated by this task for this table
+ createFileForCommit(closedFiles, fileForCommitLocation, table.io());
+ } else {
+ // When using Tez multi-table inserts, we could have more output tables in config than
+ // the actual tables this task has written to and has serialized in its config
+ LOG.info("CommitTask found no serialized table in config for table: {}.", output);
+ }
}, IOException.class);
} finally {
if (tableExecutor != null) {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index aca5c04..972fcf5 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -659,9 +659,23 @@ public class TestHiveIcebergStorageHandlerWithEngine {
Table target1 = testTables.createTable(shell, "target1", target1Schema, fileFormat, ImmutableList.of());
Table target2 = testTables.createTable(shell, "target2", target2Schema, fileFormat, ImmutableList.of());
+ // simple insert: should create a single vertex writing to both target tables
shell.executeStatement("FROM customers " +
- "INSERT INTO target1 SELECT customer_id, first_name " +
- "INSERT INTO target2 SELECT last_name, customer_id");
+ "INSERT INTO target1 SELECT customer_id, first_name " +
+ "INSERT INTO target2 SELECT last_name, customer_id");
+
+ // Check that everything is as expected
+ HiveIcebergTestUtils.validateData(target1, target1Records, 0);
+ HiveIcebergTestUtils.validateData(target2, target2Records, 1);
+
+ // truncate the target tables
+ testTables.truncateIcebergTable(target1);
+ testTables.truncateIcebergTable(target2);
+
+ // complex insert: should use a different vertex for each target table
+ shell.executeStatement("FROM customers " +
+ "INSERT INTO target1 SELECT customer_id, first_name ORDER BY first_name " +
+ "INSERT INTO target2 SELECT last_name, customer_id ORDER BY last_name");
// Check that everything is as expected
HiveIcebergTestUtils.validateData(target1, target1Records, 0);
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4d3a60e..c282360 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.Tables;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
@@ -275,6 +276,14 @@ abstract class TestTables {
}
}
+ /**
+ * Truncates an Iceberg table.
+ * @param table The iceberg table to truncate
+ */
+ public void truncateIcebergTable(Table table) {
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+ }
+
private static class CatalogToTables implements Tables {
private final Catalog catalog;