You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/02/18 10:35:21 UTC
[iceberg] branch master updated: Hive: skip projection pushdown for
output tables (#2246)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 04f4a30 Hive: skip projection pushdown for output tables (#2246)
04f4a30 is described below
commit 04f4a309240ef52faaebe4eef5820a682906b780
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Thu Feb 18 11:35:12 2021 +0100
Hive: skip projection pushdown for output tables (#2246)
---
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 28 +++++++++++++---------
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 11 +++++----
.../TestHiveIcebergStorageHandlerWithEngine.java | 26 ++++++++++++++++++++
3 files changed, 50 insertions(+), 15 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 851be83..0dd3c7f 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -88,18 +88,24 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
}
- configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
- String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
- // When same table is joined multiple times, it is possible some selected columns are duplicated,
- // in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
- String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
- Schema projectedSchema = distinctSelectedColumns.length > 0 ?
- tableSchema.caseInsensitiveSelect(distinctSelectedColumns) : tableSchema;
- // the input split mapper handles does not belong to this table
- // it is necessary to ensure projectedSchema equals to tableSchema,
- // or we cannot find selectOperator's column from inspector
- if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
+ Schema projectedSchema;
+ if (serDeProperties.get(HiveIcebergStorageHandler.WRITE_KEY) != null) {
+ // when writing out data, we should not do projection pushdown
projectedSchema = tableSchema;
+ } else {
+ configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
+ String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
+ // When same table is joined multiple times, it is possible some selected columns are duplicated,
+ // in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
+ String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
+ projectedSchema = distinctSelectedColumns.length > 0 ?
+ tableSchema.caseInsensitiveSelect(distinctSelectedColumns) : tableSchema;
+ // the input split mapper handles does not belong to this table
+ // it is necessary to ensure projectedSchema equals to tableSchema,
+ // or we cannot find selectOperator's column from inspector
+ if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
+ projectedSchema = tableSchema;
+ }
}
try {
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index aa5b9db..955fa8c 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -58,7 +58,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
- private static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
+ static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
private Configuration conf;
@@ -95,7 +95,10 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
overlayTableProperties(conf, tableDesc, map);
- map.put(WRITE_KEY, "true");
+ // Putting the key into the table props and not the map, so that projection pushdown can be determined on a
+ // table-level and skipped only for output tables in HiveIcebergSerde. Properties from the map will be present in
+ // the serde config for all tables in the query, not just the output tables, so we can't rely on that in the serde.
+ tableDesc.getProperties().put(WRITE_KEY, "true");
}
@Override
@@ -111,8 +114,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
- if (tableDesc != null && tableDesc.getJobProperties() != null &&
- tableDesc.getJobProperties().get(WRITE_KEY) != null) {
+ if (tableDesc != null && tableDesc.getProperties() != null &&
+ tableDesc.getProperties().get(WRITE_KEY) != null) {
jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
}
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index f74039b..c115fb3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.After;
@@ -396,6 +397,31 @@ public class TestHiveIcebergStorageHandlerWithEngine {
}
@Test
+ public void testInsertUsingSourceTableWithSharedColumnsNames() throws IOException {
+ Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+ List<Record> records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS;
+ PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .identity("last_name").build();
+ testTables.createTable(shell, "source_customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat, records);
+ Table table = testTables.createTable(shell, "target_customers",
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, ImmutableList.of());
+
+ // Below select from source table should produce: "hive.io.file.readcolumn.names=customer_id,last_name".
+ // Inserting into the target table should not fail because first_name is not selected from the source table
+ shell.executeStatement("INSERT INTO target_customers SELECT customer_id, 'Sam', last_name FROM source_customers");
+
+ List<Record> expected = Lists.newArrayListWithExpectedSize(records.size());
+ records.forEach(r -> {
+ Record copy = r.copy();
+ copy.setField("first_name", "Sam");
+ expected.add(copy);
+ });
+ HiveIcebergTestUtils.validateData(table, expected, 0);
+ }
+
+ @Test
public void testWriteArrayOfPrimitivesInTable() throws IOException {
Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
Schema schema = new Schema(required(1, "id", Types.LongType.get()),