You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/02/18 10:35:21 UTC

[iceberg] branch master updated: Hive: skip projection pushdown for output tables (#2246)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 04f4a30  Hive: skip projection pushdown for output tables (#2246)
04f4a30 is described below

commit 04f4a309240ef52faaebe4eef5820a682906b780
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Thu Feb 18 11:35:12 2021 +0100

    Hive: skip projection pushdown for output tables (#2246)
---
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   | 28 +++++++++++++---------
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 11 +++++----
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 26 ++++++++++++++++++++
 3 files changed, 50 insertions(+), 15 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 851be83..0dd3c7f 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -88,18 +88,24 @@ public class HiveIcebergSerDe extends AbstractSerDe {
       }
     }
 
-    configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
-    String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
-    // When same table is joined multiple times, it is possible some selected columns are duplicated,
-    // in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
-    String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
-    Schema projectedSchema = distinctSelectedColumns.length > 0 ?
-            tableSchema.caseInsensitiveSelect(distinctSelectedColumns) : tableSchema;
-    // the input split mapper handles does not belong to this table
-    // it is necessary to ensure projectedSchema equals to tableSchema,
-    // or we cannot find selectOperator's column from inspector
-    if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
+    Schema projectedSchema;
+    if (serDeProperties.get(HiveIcebergStorageHandler.WRITE_KEY) != null) {
+      // when writing out data, we should not do projection pushdown
       projectedSchema = tableSchema;
+    } else {
+      configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
+      String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
+      // When same table is joined multiple times, it is possible some selected columns are duplicated,
+      // in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
+      String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
+      projectedSchema = distinctSelectedColumns.length > 0 ?
+              tableSchema.caseInsensitiveSelect(distinctSelectedColumns) : tableSchema;
+      // the input split mapper handles does not belong to this table
+      // it is necessary to ensure projectedSchema equals to tableSchema,
+      // or we cannot find selectOperator's column from inspector
+      if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
+        projectedSchema = tableSchema;
+      }
     }
 
     try {
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index aa5b9db..955fa8c 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -58,7 +58,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
 
 public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
 
-  private static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
+  static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
 
   private Configuration conf;
 
@@ -95,7 +95,10 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   @Override
   public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
     overlayTableProperties(conf, tableDesc, map);
-    map.put(WRITE_KEY, "true");
+    // Putting the key into the table props and not the map, so that projection pushdown can be determined on a
+    // table-level and skipped only for output tables in HiveIcebergSerde. Properties from the map will be present in
+    // the serde config for all tables in the query, not just the output tables, so we can't rely on that in the serde.
+    tableDesc.getProperties().put(WRITE_KEY, "true");
   }
 
   @Override
@@ -111,8 +114,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
 
   @Override
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
-    if (tableDesc != null && tableDesc.getJobProperties() != null &&
-        tableDesc.getJobProperties().get(WRITE_KEY) != null) {
+    if (tableDesc != null && tableDesc.getProperties() != null &&
+        tableDesc.getProperties().get(WRITE_KEY) != null) {
       jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
     }
   }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index f74039b..c115fb3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
@@ -396,6 +397,31 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   }
 
   @Test
+  public void testInsertUsingSourceTableWithSharedColumnsNames() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    List<Record> records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS;
+    PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("last_name").build();
+    testTables.createTable(shell, "source_customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        spec, fileFormat, records);
+    Table table = testTables.createTable(shell, "target_customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, ImmutableList.of());
+
+    // Below select from source table should produce: "hive.io.file.readcolumn.names=customer_id,last_name".
+    // Inserting into the target table should not fail because first_name is not selected from the source table
+    shell.executeStatement("INSERT INTO target_customers SELECT customer_id, 'Sam', last_name FROM source_customers");
+
+    List<Record> expected = Lists.newArrayListWithExpectedSize(records.size());
+    records.forEach(r -> {
+      Record copy = r.copy();
+      copy.setField("first_name", "Sam");
+      expected.add(copy);
+    });
+    HiveIcebergTestUtils.validateData(table, expected, 0);
+  }
+
+  @Test
   public void testWriteArrayOfPrimitivesInTable() throws IOException {
     Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
     Schema schema = new Schema(required(1, "id", Types.LongType.get()),