You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/18 23:40:20 UTC

[iceberg] branch master updated: MR: Support imported data in InputFormat using name mapping (#3312)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e72b5  MR: Support imported data in InputFormat using name mapping (#3312)
34e72b5 is described below

commit 34e72b55d68a90d82f9349796b1e2c0667892820
Author: Edgar Rodriguez <ed...@airbnb.com>
AuthorDate: Mon Oct 18 19:40:11 2021 -0400

    MR: Support imported data in InputFormat using name mapping (#3312)
---
 .../org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index a045c3a..2666845 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -66,6 +66,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
@@ -191,6 +192,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
     private TaskAttemptContext context;
     private Schema tableSchema;
     private Schema expectedSchema;
+    private String nameMapping;
     private boolean reuseContainers;
     private boolean caseSensitive;
     private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
@@ -211,6 +213,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       this.encryptionManager = table.encryption();
       this.tasks = task.files().iterator();
       this.tableSchema = InputFormatConfig.tableSchema(conf);
+      this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
       this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);
       this.expectedSchema = readSchema(conf, tableSchema, caseSensitive);
       this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
@@ -326,6 +329,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       if (reuseContainers) {
         avroReadBuilder.reuseContainers();
       }
+      if (nameMapping != null) {
+        avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      }
 
       switch (inMemoryDataModel) {
         case PIG:
@@ -350,6 +356,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       if (reuseContainers) {
         parquetReadBuilder.reuseContainers();
       }
+      if (nameMapping != null) {
+        parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      }
 
       switch (inMemoryDataModel) {
         case PIG:
@@ -391,6 +400,10 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           orcReadBuilder.createReaderFunc(
               fileSchema -> GenericOrcReader.buildReader(
                   readSchema, fileSchema, idToConstant));
+
+          if (nameMapping != null) {
+            orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+          }
           orcIterator = orcReadBuilder.build();
       }