You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/02/19 10:28:55 UTC

[iceberg] branch master updated: Hive: enable inserting data from joins (#2251)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cea6498  Hive: enable inserting data from joins (#2251)
cea6498 is described below

commit cea6498352e4de3bbe62fc3218cb06abd01ce3de
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Fri Feb 19 11:28:42 2021 +0100

    Hive: enable inserting data from joins (#2251)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java    |  6 +++++-
 .../hive/TestHiveIcebergStorageHandlerWithEngine.java | 19 +++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 955fa8c..10a1e27 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -95,7 +95,11 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   @Override
   public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
     overlayTableProperties(conf, tableDesc, map);
-    // Putting the key into the table props and not the map, so that projection pushdown can be determined on a
+    // For Tez, setting the committer here is enough to make sure it'll be part of the jobConf
+    map.put("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
+    // For MR, the jobConf is set only in configureJobConf, so we're setting the write key here to detect it over there
+    map.put(WRITE_KEY, "true");
+    // Putting the key into the table props as well, so that projection pushdown can be determined on a
     // table-level and skipped only for output tables in HiveIcebergSerde. Properties from the map will be present in
     // the serde config for all tables in the query, not just the output tables, so we can't rely on that in the serde.
     tableDesc.getProperties().put(WRITE_KEY, "true");
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index c115fb3..ff120a3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -422,6 +422,25 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   }
 
   @Test
+  public void testInsertFromJoiningTwoIcebergTables() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+            .identity("last_name").build();
+    testTables.createTable(shell, "source_customers_1", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            spec, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    testTables.createTable(shell, "source_customers_2", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            spec, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    Table table = testTables.createTable(shell, "target_customers",
+            HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, ImmutableList.of());
+
+    shell.executeStatement("INSERT INTO target_customers SELECT a.customer_id, b.first_name, a.last_name FROM " +
+            "source_customers_1 a JOIN source_customers_2 b ON a.last_name = b.last_name");
+
+    HiveIcebergTestUtils.validateData(table, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 0);
+  }
+
+  @Test
   public void testWriteArrayOfPrimitivesInTable() throws IOException {
     Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
     Schema schema = new Schema(required(1, "id", Types.LongType.get()),