You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/22 21:52:27 UTC

[iceberg] branch master updated: MR: Pass identity values via constants map in InputFormat (#1130)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new afb6a12  MR: Pass identity values via constants map in InputFormat (#1130)
afb6a12 is described below

commit afb6a12027b1eaffafd667a77ade81f7bd0180d3
Author: Ratandeep Ratti <rd...@gmail.com>
AuthorDate: Mon Jun 22 14:52:19 2020 -0700

    MR: Pass identity values via constants map in InputFormat (#1130)
---
 .../iceberg/data/IdentityPartitionConverters.java  |  62 +++++++++++
 .../org/apache/iceberg/data/TableScanIterable.java |  37 +------
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 116 ++++++---------------
 3 files changed, 93 insertions(+), 122 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
new file mode 100644
index 0000000..2b24f84
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+
+
+public class IdentityPartitionConverters {
+  private IdentityPartitionConverters() {
+  }
+
+  /**
+   * Conversions from internal representations to Iceberg generic values.
+   */
+  public static Object convertConstant(Type type, Object value) {
+    if (value == null) {
+      return null;
+    }
+
+    switch (type.typeId()) {
+      case STRING:
+        return value.toString();
+      case TIME:
+        return DateTimeUtil.timeFromMicros((Long) value);
+      case DATE:
+        return DateTimeUtil.dateFromDays((Integer) value);
+      case TIMESTAMP:
+        if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+          return DateTimeUtil.timestamptzFromMicros((Long) value);
+        } else {
+          return DateTimeUtil.timestampFromMicros((Long) value);
+        }
+      case FIXED:
+        if (value instanceof GenericData.Fixed) {
+          return ((GenericData.Fixed) value).bytes();
+        }
+        return value;
+      default:
+    }
+    return value;
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 587c7ee..074fcc8 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HasTableOperations;
@@ -48,9 +47,6 @@ import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.iceberg.util.PartitionUtil;
 
 class TableScanIterable extends CloseableGroup implements CloseableIterable<Record> {
@@ -81,7 +77,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
 
   private CloseableIterable<Record> open(FileScanTask task) {
     InputFile input = ops.io().newInputFile(task.file().path().toString());
-    Map<Integer, ?> partition = PartitionUtil.constantsMap(task, TableScanIterable::convertConstant);
+    Map<Integer, ?> partition = PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);
 
     switch (task.file().format()) {
       case AVRO:
@@ -193,35 +189,4 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
       }
     }
   }
-
-  /**
-   * Conversions from generic Avro values to Iceberg generic values.
-   */
-  private static Object convertConstant(Type type, Object value) {
-    if (value == null) {
-      return null;
-    }
-
-    switch (type.typeId()) {
-      case STRING:
-        return value.toString();
-      case TIME:
-        return DateTimeUtil.timeFromMicros((Long) value);
-      case DATE:
-        return DateTimeUtil.dateFromDays((Integer) value);
-      case TIMESTAMP:
-        if (((Types.TimestampType) type).shouldAdjustToUTC()) {
-          return DateTimeUtil.timestamptzFromMicros((Long) value);
-        } else {
-          return DateTimeUtil.timestampFromMicros((Long) value);
-        }
-      case FIXED:
-        if (value instanceof GenericData.Fixed) {
-          return ((GenericData.Fixed) value).bytes();
-        }
-        return value;
-      default:
-    }
-    return value;
-  }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 9db226e..b209a80 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -22,10 +22,12 @@ package org.apache.iceberg.mr.mapreduce;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -38,7 +40,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
@@ -50,8 +51,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.IdentityPartitionConverters;
 import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -70,10 +70,9 @@ import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -284,7 +283,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
     private boolean reuseContainers;
     private boolean caseSensitive;
     private InMemoryDataModel inMemoryDataModel;
-    private Map<String, Integer> namesToPos;
     private Iterator<FileScanTask> tasks;
     private T currentRow;
     private CloseableIterator<T> currentIterator;
@@ -299,11 +297,10 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       this.tableSchema = SchemaParser.fromJson(conf.get(TABLE_SCHEMA));
       String readSchemaStr = conf.get(READ_SCHEMA);
       this.expectedSchema = readSchemaStr != null ? SchemaParser.fromJson(readSchemaStr) : tableSchema;
-      this.namesToPos = buildNameToPos(expectedSchema);
       this.reuseContainers = conf.getBoolean(REUSE_CONTAINERS, false);
       this.caseSensitive = conf.getBoolean(CASE_SENSITIVE, true);
       this.inMemoryDataModel = conf.getEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC);
-      this.currentIterator = open(tasks.next());
+      this.currentIterator = open(tasks.next(), expectedSchema).iterator();
     }
 
     @Override
@@ -314,7 +311,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           return true;
         } else if (tasks.hasNext()) {
           currentIterator.close();
-          currentIterator = open(tasks.next());
+          currentIterator = open(tasks.next(), expectedSchema).iterator();
         } else {
           currentIterator.close();
           return false;
@@ -348,35 +345,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       currentIterator.close();
     }
 
-    private static Map<String, Integer> buildNameToPos(Schema expectedSchema) {
-      Map<String, Integer> nameToPos = Maps.newHashMap();
-      for (int pos = 0; pos < expectedSchema.asStruct().fields().size(); pos++) {
-        Types.NestedField field = expectedSchema.asStruct().fields().get(pos);
-        nameToPos.put(field.name(), pos);
-      }
-      return nameToPos;
-    }
-
-    private CloseableIterator<T> open(FileScanTask currentTask) {
-      DataFile file = currentTask.file();
-      // schema of rows returned by readers
-      PartitionSpec spec = currentTask.spec();
-      Set<Integer> idColumns =  Sets.intersection(spec.identitySourceIds(), TypeUtil.getProjectedIds(expectedSchema));
-      boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
-
-      CloseableIterable<T> iterable;
-      if (hasJoinedPartitionColumns) {
-        Schema readDataSchema = TypeUtil.selectNot(expectedSchema, idColumns);
-        Schema identityPartitionSchema = TypeUtil.select(expectedSchema, idColumns);
-        iterable = CloseableIterable.transform(open(currentTask, readDataSchema),
-            row -> withIdentityPartitionColumns(row, identityPartitionSchema, spec, file.partition()));
-      } else {
-        iterable = open(currentTask, expectedSchema);
-      }
-
-      return iterable.iterator();
-    }
-
     private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) {
       DataFile file = currentTask.file();
       // TODO we should make use of FileIO to create inputFile
@@ -400,49 +368,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       return iterable;
     }
 
-    @SuppressWarnings("unchecked")
-    private T withIdentityPartitionColumns(
-        T row, Schema identityPartitionSchema, PartitionSpec spec, StructLike partition) {
-      switch (inMemoryDataModel) {
-        case PIG:
-        case HIVE:
-          throw new UnsupportedOperationException(
-              "Adding partition columns to Pig and Hive data model are not supported yet");
-        case GENERIC:
-          return (T) withIdentityPartitionColumns((Record) row, identityPartitionSchema, spec, partition);
-      }
-      return row;
-    }
-
-    private Record withIdentityPartitionColumns(
-        Record record, Schema identityPartitionSchema, PartitionSpec spec, StructLike partitionTuple) {
-      List<PartitionField> partitionFields = spec.fields();
-      List<Types.NestedField> identityColumns = identityPartitionSchema.columns();
-      GenericRecord row = GenericRecord.create(expectedSchema.asStruct());
-      namesToPos.forEach((name, pos) -> {
-        Object field = record.getField(name);
-        if (field != null) {
-          row.set(pos, field);
-        }
-
-        // if the current name, pos points to an identity partition column, we set the
-        // column at pos correctly by reading the corresponding value from partitionTuple`
-        for (int i = 0; i < identityColumns.size(); i++) {
-          Types.NestedField identityColumn = identityColumns.get(i);
-          for (int j = 0; j < partitionFields.size(); j++) {
-            PartitionField partitionField = partitionFields.get(j);
-            if (name.equals(identityColumn.name()) &&
-                identityColumn.fieldId() == partitionField.sourceId() &&
-                "identity".equals(partitionField.transform().toString())) {
-              row.set(pos, partitionTuple.get(j, spec.javaClasses()[j]));
-            }
-          }
-        }
-      });
-
-      return row;
-    }
-
     private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, Expression residual,
                                                         Schema readSchema) {
       boolean applyResidual = !context.getConfiguration().getBoolean(SKIP_RESIDUAL_FILTERING, false);
@@ -455,7 +380,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       }
     }
 
-    private CloseableIterable<T> newAvroIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
+    private CloseableIterable<T> newAvroIterable(
+        InputFile inputFile, FileScanTask task, Schema readSchema) {
       Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
           .project(readSchema)
           .split(task.start(), task.length());
@@ -469,7 +395,10 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           //TODO implement value readers for Pig and Hive
           throw new UnsupportedOperationException("Avro support not yet supported for Pig and Hive");
         case GENERIC:
-          avroReadBuilder.createReaderFunc(DataReader::create);
+          avroReadBuilder.createReaderFunc(
+              (expIcebergSchema, expAvroSchema) ->
+                  DataReader.create(expIcebergSchema, expAvroSchema,
+                      constantsMap(task, IdentityPartitionConverters::convertConstant)));
       }
       return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema);
     }
@@ -491,7 +420,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           throw new UnsupportedOperationException("Parquet support not yet supported for Pig and Hive");
         case GENERIC:
           parquetReadBuilder.createReaderFunc(
-              fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema));
+              fileSchema -> GenericParquetReaders.buildReader(
+                  readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
       }
       return applyResidualFiltering(parquetReadBuilder.build(), task.residual(), readSchema);
     }
@@ -509,11 +439,25 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           //TODO: implement value readers for Pig and Hive
           throw new UnsupportedOperationException("ORC support not yet supported for Pig and Hive");
         case GENERIC:
-          orcReadBuilder.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema));
+          orcReadBuilder.createReaderFunc(
+              fileSchema -> GenericOrcReader.buildReader(
+                  readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
       }
 
       return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema);
     }
+
+    private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
+      PartitionSpec spec = task.spec();
+      Set<Integer> idColumns = spec.identitySourceIds();
+      Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
+      boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
+      if (projectsIdentityPartitionColumns) {
+        return PartitionUtil.constantsMap(task, converter);
+      } else {
+        return Collections.emptyMap();
+      }
+    }
   }
 
   private static Table findTable(Configuration conf) {