You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/22 21:52:27 UTC
[iceberg] branch master updated: MR: Pass identity values via
constants map in InputFormat (#1130)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new afb6a12 MR: Pass identity values via constants map in InputFormat (#1130)
afb6a12 is described below
commit afb6a12027b1eaffafd667a77ade81f7bd0180d3
Author: Ratandeep Ratti <rd...@gmail.com>
AuthorDate: Mon Jun 22 14:52:19 2020 -0700
MR: Pass identity values via constants map in InputFormat (#1130)
---
.../iceberg/data/IdentityPartitionConverters.java | 62 +++++++++++
.../org/apache/iceberg/data/TableScanIterable.java | 37 +------
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 116 ++++++---------------
3 files changed, 93 insertions(+), 122 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
new file mode 100644
index 0000000..2b24f84
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+
+
+public class IdentityPartitionConverters {
+ private IdentityPartitionConverters() {
+ }
+
+ /**
+ * Conversions from internal representations to Iceberg generic values.
+ */
+ public static Object convertConstant(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (type.typeId()) {
+ case STRING:
+ return value.toString();
+ case TIME:
+ return DateTimeUtil.timeFromMicros((Long) value);
+ case DATE:
+ return DateTimeUtil.dateFromDays((Integer) value);
+ case TIMESTAMP:
+ if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+ return DateTimeUtil.timestamptzFromMicros((Long) value);
+ } else {
+ return DateTimeUtil.timestampFromMicros((Long) value);
+ }
+ case FIXED:
+ if (value instanceof GenericData.Fixed) {
+ return ((GenericData.Fixed) value).bytes();
+ }
+ return value;
+ default:
+ }
+ return value;
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 587c7ee..074fcc8 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
-import org.apache.avro.generic.GenericData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
@@ -48,9 +47,6 @@ import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.PartitionUtil;
class TableScanIterable extends CloseableGroup implements CloseableIterable<Record> {
@@ -81,7 +77,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
private CloseableIterable<Record> open(FileScanTask task) {
InputFile input = ops.io().newInputFile(task.file().path().toString());
- Map<Integer, ?> partition = PartitionUtil.constantsMap(task, TableScanIterable::convertConstant);
+ Map<Integer, ?> partition = PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);
switch (task.file().format()) {
case AVRO:
@@ -193,35 +189,4 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
}
}
}
-
- /**
- * Conversions from generic Avro values to Iceberg generic values.
- */
- private static Object convertConstant(Type type, Object value) {
- if (value == null) {
- return null;
- }
-
- switch (type.typeId()) {
- case STRING:
- return value.toString();
- case TIME:
- return DateTimeUtil.timeFromMicros((Long) value);
- case DATE:
- return DateTimeUtil.dateFromDays((Integer) value);
- case TIMESTAMP:
- if (((Types.TimestampType) type).shouldAdjustToUTC()) {
- return DateTimeUtil.timestamptzFromMicros((Long) value);
- } else {
- return DateTimeUtil.timestampFromMicros((Long) value);
- }
- case FIXED:
- if (value instanceof GenericData.Fixed) {
- return ((GenericData.Fixed) value).bytes();
- }
- return value;
- default:
- }
- return value;
- }
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 9db226e..b209a80 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -22,10 +22,12 @@ package org.apache.iceberg.mr.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -38,7 +40,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
@@ -50,8 +51,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -70,10 +70,9 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -284,7 +283,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
private boolean reuseContainers;
private boolean caseSensitive;
private InMemoryDataModel inMemoryDataModel;
- private Map<String, Integer> namesToPos;
private Iterator<FileScanTask> tasks;
private T currentRow;
private CloseableIterator<T> currentIterator;
@@ -299,11 +297,10 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
this.tableSchema = SchemaParser.fromJson(conf.get(TABLE_SCHEMA));
String readSchemaStr = conf.get(READ_SCHEMA);
this.expectedSchema = readSchemaStr != null ? SchemaParser.fromJson(readSchemaStr) : tableSchema;
- this.namesToPos = buildNameToPos(expectedSchema);
this.reuseContainers = conf.getBoolean(REUSE_CONTAINERS, false);
this.caseSensitive = conf.getBoolean(CASE_SENSITIVE, true);
this.inMemoryDataModel = conf.getEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC);
- this.currentIterator = open(tasks.next());
+ this.currentIterator = open(tasks.next(), expectedSchema).iterator();
}
@Override
@@ -314,7 +311,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
return true;
} else if (tasks.hasNext()) {
currentIterator.close();
- currentIterator = open(tasks.next());
+ currentIterator = open(tasks.next(), expectedSchema).iterator();
} else {
currentIterator.close();
return false;
@@ -348,35 +345,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
currentIterator.close();
}
- private static Map<String, Integer> buildNameToPos(Schema expectedSchema) {
- Map<String, Integer> nameToPos = Maps.newHashMap();
- for (int pos = 0; pos < expectedSchema.asStruct().fields().size(); pos++) {
- Types.NestedField field = expectedSchema.asStruct().fields().get(pos);
- nameToPos.put(field.name(), pos);
- }
- return nameToPos;
- }
-
- private CloseableIterator<T> open(FileScanTask currentTask) {
- DataFile file = currentTask.file();
- // schema of rows returned by readers
- PartitionSpec spec = currentTask.spec();
- Set<Integer> idColumns = Sets.intersection(spec.identitySourceIds(), TypeUtil.getProjectedIds(expectedSchema));
- boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
-
- CloseableIterable<T> iterable;
- if (hasJoinedPartitionColumns) {
- Schema readDataSchema = TypeUtil.selectNot(expectedSchema, idColumns);
- Schema identityPartitionSchema = TypeUtil.select(expectedSchema, idColumns);
- iterable = CloseableIterable.transform(open(currentTask, readDataSchema),
- row -> withIdentityPartitionColumns(row, identityPartitionSchema, spec, file.partition()));
- } else {
- iterable = open(currentTask, expectedSchema);
- }
-
- return iterable.iterator();
- }
-
private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) {
DataFile file = currentTask.file();
// TODO we should make use of FileIO to create inputFile
@@ -400,49 +368,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
return iterable;
}
- @SuppressWarnings("unchecked")
- private T withIdentityPartitionColumns(
- T row, Schema identityPartitionSchema, PartitionSpec spec, StructLike partition) {
- switch (inMemoryDataModel) {
- case PIG:
- case HIVE:
- throw new UnsupportedOperationException(
- "Adding partition columns to Pig and Hive data model are not supported yet");
- case GENERIC:
- return (T) withIdentityPartitionColumns((Record) row, identityPartitionSchema, spec, partition);
- }
- return row;
- }
-
- private Record withIdentityPartitionColumns(
- Record record, Schema identityPartitionSchema, PartitionSpec spec, StructLike partitionTuple) {
- List<PartitionField> partitionFields = spec.fields();
- List<Types.NestedField> identityColumns = identityPartitionSchema.columns();
- GenericRecord row = GenericRecord.create(expectedSchema.asStruct());
- namesToPos.forEach((name, pos) -> {
- Object field = record.getField(name);
- if (field != null) {
- row.set(pos, field);
- }
-
- // if the current name, pos points to an identity partition column, we set the
- // column at pos correctly by reading the corresponding value from partitionTuple`
- for (int i = 0; i < identityColumns.size(); i++) {
- Types.NestedField identityColumn = identityColumns.get(i);
- for (int j = 0; j < partitionFields.size(); j++) {
- PartitionField partitionField = partitionFields.get(j);
- if (name.equals(identityColumn.name()) &&
- identityColumn.fieldId() == partitionField.sourceId() &&
- "identity".equals(partitionField.transform().toString())) {
- row.set(pos, partitionTuple.get(j, spec.javaClasses()[j]));
- }
- }
- }
- });
-
- return row;
- }
-
private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, Expression residual,
Schema readSchema) {
boolean applyResidual = !context.getConfiguration().getBoolean(SKIP_RESIDUAL_FILTERING, false);
@@ -455,7 +380,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
}
}
- private CloseableIterable<T> newAvroIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
+ private CloseableIterable<T> newAvroIterable(
+ InputFile inputFile, FileScanTask task, Schema readSchema) {
Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
.project(readSchema)
.split(task.start(), task.length());
@@ -469,7 +395,10 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
//TODO implement value readers for Pig and Hive
throw new UnsupportedOperationException("Avro support not yet supported for Pig and Hive");
case GENERIC:
- avroReadBuilder.createReaderFunc(DataReader::create);
+ avroReadBuilder.createReaderFunc(
+ (expIcebergSchema, expAvroSchema) ->
+ DataReader.create(expIcebergSchema, expAvroSchema,
+ constantsMap(task, IdentityPartitionConverters::convertConstant)));
}
return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema);
}
@@ -491,7 +420,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
throw new UnsupportedOperationException("Parquet support not yet supported for Pig and Hive");
case GENERIC:
parquetReadBuilder.createReaderFunc(
- fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema));
+ fileSchema -> GenericParquetReaders.buildReader(
+ readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
}
return applyResidualFiltering(parquetReadBuilder.build(), task.residual(), readSchema);
}
@@ -509,11 +439,25 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
//TODO: implement value readers for Pig and Hive
throw new UnsupportedOperationException("ORC support not yet supported for Pig and Hive");
case GENERIC:
- orcReadBuilder.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema));
+ orcReadBuilder.createReaderFunc(
+ fileSchema -> GenericOrcReader.buildReader(
+ readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
}
return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema);
}
+
+ private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
+ PartitionSpec spec = task.spec();
+ Set<Integer> idColumns = spec.identitySourceIds();
+ Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
+ boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
+ if (projectsIdentityPartitionColumns) {
+ return PartitionUtil.constantsMap(task, converter);
+ } else {
+ return Collections.emptyMap();
+ }
+ }
}
private static Table findTable(Configuration conf) {