You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/02/19 10:28:55 UTC
[iceberg] branch master updated: Hive: enable inserting data from
joins (#2251)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cea6498 Hive: enable inserting data from joins (#2251)
cea6498 is described below
commit cea6498352e4de3bbe62fc3218cb06abd01ce3de
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Fri Feb 19 11:28:42 2021 +0100
Hive: enable inserting data from joins (#2251)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 6 +++++-
.../hive/TestHiveIcebergStorageHandlerWithEngine.java | 19 +++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 955fa8c..10a1e27 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -95,7 +95,11 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
overlayTableProperties(conf, tableDesc, map);
- // Putting the key into the table props and not the map, so that projection pushdown can be determined on a
+ // For Tez, setting the committer here is enough to make sure it'll be part of the jobConf
+ map.put("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
+ // For MR, the jobConf is set only in configureJobConf, so we're setting the write key here to detect it over there
+ map.put(WRITE_KEY, "true");
+ // Putting the key into the table props as well, so that projection pushdown can be determined on a
// table-level and skipped only for output tables in HiveIcebergSerde. Properties from the map will be present in
// the serde config for all tables in the query, not just the output tables, so we can't rely on that in the serde.
tableDesc.getProperties().put(WRITE_KEY, "true");
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index c115fb3..ff120a3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -422,6 +422,25 @@ public class TestHiveIcebergStorageHandlerWithEngine {
}
@Test
+ public void testInsertFromJoiningTwoIcebergTables() throws IOException {
+ Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+ PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .identity("last_name").build();
+ testTables.createTable(shell, "source_customers_1", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ testTables.createTable(shell, "source_customers_2", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ Table table = testTables.createTable(shell, "target_customers",
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, ImmutableList.of());
+
+ shell.executeStatement("INSERT INTO target_customers SELECT a.customer_id, b.first_name, a.last_name FROM " +
+ "source_customers_1 a JOIN source_customers_2 b ON a.last_name = b.last_name");
+
+ HiveIcebergTestUtils.validateData(table, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 0);
+ }
+
+ @Test
public void testWriteArrayOfPrimitivesInTable() throws IOException {
Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
Schema schema = new Schema(required(1, "id", Types.LongType.get()),