You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/04/11 15:51:42 UTC
[incubator-iceberg] branch master updated: Parquet: Support
constant map for partition values (#909)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3a35c07 Parquet: Support constant map for partition values (#909)
3a35c07 is described below
commit 3a35c0764a51008fa5c5ecea109c87f92106dbcf
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Sat Apr 11 08:51:33 2020 -0700
Parquet: Support constant map for partition values (#909)
This is a follow-up to #896, which added the same constant map support for Avro.
Fixes #575 for Parquet and replaces #585. Andrei did a lot of the work for this in #585.
Co-authored-by: Andrei Ionescu <we...@gmail.com>
---
.../java/org/apache/iceberg/avro/ValueReaders.java | 10 +---
.../org/apache/iceberg/util/PartitionUtil.java | 16 ++++-
.../java/org/apache/iceberg/data/DateTimeUtil.java | 52 ++++++++++++++++
.../org/apache/iceberg/data/TableScanIterable.java | 38 +++++++++++-
.../apache/iceberg/data/avro/GenericReaders.java | 15 ++---
.../data/parquet/GenericParquetReaders.java | 38 ++++++++----
.../iceberg/spark/data/SparkParquetReaders.java | 39 ++++++++----
.../iceberg/spark/data/SparkValueReaders.java | 28 ---------
.../apache/iceberg/spark/source/RowDataReader.java | 47 +++++++++++++--
.../iceberg/spark/source/TestPartitionValues.java | 69 ++++++++++++++++++++++
10 files changed, 272 insertions(+), 80 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index cbbf77c..7d63509 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -40,7 +40,6 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import static java.util.Collections.emptyIterator;
@@ -580,10 +579,9 @@ public class ValueReaders {
List<Object> constantList = Lists.newArrayListWithCapacity(fields.size());
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
- Object constant = idToConstant.get(field.fieldId());
- if (constant != null) {
+ if (idToConstant.containsKey(field.fieldId())) {
positionList.add(pos);
- constantList.add(prepareConstant(field.type(), constant));
+ constantList.add(idToConstant.get(field.fieldId()));
}
}
@@ -597,10 +595,6 @@ public class ValueReaders {
protected abstract void set(S struct, int pos, Object value);
- protected Object prepareConstant(Type type, Object value) {
- return value;
- }
-
public ValueReader<?> reader(int pos) {
return readers[pos];
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index 9a2aa99..1ef67db 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -22,26 +22,36 @@ package org.apache.iceberg.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.BiFunction;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
public class PartitionUtil {
private PartitionUtil() {
}
public static Map<Integer, ?> constantsMap(FileScanTask task) {
- return constantsMap(task.spec(), task.file().partition());
+ return constantsMap(task, (type, constant) -> constant);
}
- private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData) {
+ public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
+ return constantsMap(task.spec(), task.file().partition(), convertConstant);
+ }
+
+ private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData,
+ BiFunction<Type, Object, Object> convertConstant) {
// use java.util.HashMap because partition data may contain null values
Map<Integer, Object> idToConstant = new HashMap<>();
+ List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
PartitionField field = fields.get(pos);
- idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class));
+ Object converted = convertConstant.apply(partitionFields.get(pos).type(), partitionData.get(pos, Object.class));
+ idToConstant.put(field.sourceId(), converted);
}
return idToConstant;
}
diff --git a/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java b/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java
new file mode 100644
index 0000000..d6ab178
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+
+public class DateTimeUtil {
+ private DateTimeUtil() {
+ }
+
+ private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ public static LocalDate dateFromDays(int daysFromEpoch) {
+ return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
+ }
+
+ public static LocalTime timeFromMicros(long microFromMidnight) {
+ return LocalTime.ofNanoOfDay(microFromMidnight * 1000);
+ }
+
+ public static LocalDateTime timestampFromMicros(long microsFromEpoch) {
+ return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime();
+ }
+
+ public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
+ return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index baa2320..a683c01 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
+import org.apache.avro.generic.GenericData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
@@ -46,6 +47,8 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
class TableScanIterable extends CloseableGroup implements CloseableIterable<Record> {
@@ -76,7 +79,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
private CloseableIterable<Record> open(FileScanTask task) {
InputFile input = ops.io().newInputFile(task.file().path().toString());
- Map<Integer, ?> partition = PartitionUtil.constantsMap(task);
+ Map<Integer, ?> partition = PartitionUtil.constantsMap(task, TableScanIterable::convertConstant);
// TODO: join to partition data from the manifest file
switch (task.file().format()) {
@@ -96,7 +99,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
case PARQUET:
Parquet.ReadBuilder parquet = Parquet.read(input)
.project(projection)
- .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema))
+ .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema, partition))
.split(task.start(), task.length());
if (reuseContainers) {
@@ -185,4 +188,35 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
}
}
}
+
+ /**
+ * Conversions from generic Avro values to Iceberg generic values.
+ */
+ private static Object convertConstant(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (type.typeId()) {
+ case STRING:
+ return value.toString();
+ case TIME:
+ return DateTimeUtil.timeFromMicros((Long) value);
+ case DATE:
+ return DateTimeUtil.dateFromDays((Integer) value);
+ case TIMESTAMP:
+ if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+ return DateTimeUtil.timestamptzFromMicros((Long) value);
+ } else {
+ return DateTimeUtil.timestampFromMicros((Long) value);
+ }
+ case FIXED:
+ if (value instanceof GenericData.Fixed) {
+ return ((GenericData.Fixed) value).bytes();
+ }
+ return value;
+ default:
+ }
+ return value;
+ }
}
diff --git a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
index 7502d15..df86672 100644
--- a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
@@ -20,18 +20,16 @@
package org.apache.iceberg.data.avro;
import java.io.IOException;
-import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
+import org.apache.iceberg.data.DateTimeUtil;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types.StructType;
@@ -60,9 +58,6 @@ class GenericReaders {
return new GenericRecordReader(readers, struct, idToConstant);
}
- private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
- private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
private static class DateReader implements ValueReader<LocalDate> {
private static final DateReader INSTANCE = new DateReader();
@@ -71,7 +66,7 @@ class GenericReaders {
@Override
public LocalDate read(Decoder decoder, Object reuse) throws IOException {
- return EPOCH_DAY.plusDays(decoder.readInt());
+ return DateTimeUtil.dateFromDays(decoder.readInt());
}
}
@@ -83,7 +78,7 @@ class GenericReaders {
@Override
public LocalTime read(Decoder decoder, Object reuse) throws IOException {
- return LocalTime.ofNanoOfDay(decoder.readLong() * 1000);
+ return DateTimeUtil.timeFromMicros(decoder.readLong());
}
}
@@ -95,7 +90,7 @@ class GenericReaders {
@Override
public LocalDateTime read(Decoder decoder, Object reuse) throws IOException {
- return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS).toLocalDateTime();
+ return DateTimeUtil.timestampFromMicros(decoder.readLong());
}
}
@@ -107,7 +102,7 @@ class GenericReaders {
@Override
public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
- return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS);
+ return DateTimeUtil.timestamptzFromMicros(decoder.readLong());
}
}
diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
index 02b3bcc..bc6767f 100644
--- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.data.parquet;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.time.Instant;
@@ -65,23 +66,28 @@ public class GenericParquetReaders {
private GenericParquetReaders() {
}
- @SuppressWarnings("unchecked")
public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?> idToConstant) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<GenericRecord>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new ReadBuilder(fileSchema));
+ new ReadBuilder(fileSchema, idToConstant));
} else {
return (ParquetValueReader<GenericRecord>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new FallbackReadBuilder(fileSchema));
+ new FallbackReadBuilder(fileSchema, idToConstant));
}
}
private static class FallbackReadBuilder extends ReadBuilder {
- FallbackReadBuilder(MessageType type) {
- super(type);
+ FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ super(type, idToConstant);
}
@Override
@@ -112,9 +118,11 @@ public class GenericParquetReaders {
private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
- ReadBuilder(MessageType type) {
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
this.type = type;
+ this.idToConstant = idToConstant;
}
@Override
@@ -145,13 +153,19 @@ public class GenericParquetReaders {
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
- ParquetValueReader<?> reader = readersById.get(id);
- if (reader != null) {
- reorderedFields.add(reader);
- types.add(typesById.get(id));
- } else {
- reorderedFields.add(ParquetValueReaders.nulls());
+ if (idToConstant.containsKey(id)) {
+ // containsKey is used because the constant may be null
+ reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
types.add(null);
+ } else {
+ ParquetValueReader<?> reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(ParquetValueReaders.nulls());
+ types.add(null);
+ }
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 9a36266..190b708 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.spark.data;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.math.BigDecimal;
@@ -66,23 +67,29 @@ public class SparkParquetReaders {
private SparkParquetReaders() {
}
- @SuppressWarnings("unchecked")
public static ParquetValueReader<InternalRow> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<InternalRow> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?> idToConstant) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<InternalRow>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new ReadBuilder(fileSchema));
+ new ReadBuilder(fileSchema, idToConstant));
} else {
return (ParquetValueReader<InternalRow>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new FallbackReadBuilder(fileSchema));
+ new FallbackReadBuilder(fileSchema, idToConstant));
}
}
private static class FallbackReadBuilder extends ReadBuilder {
- FallbackReadBuilder(MessageType type) {
- super(type);
+ FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ super(type, idToConstant);
}
@Override
@@ -113,9 +120,11 @@ public class SparkParquetReaders {
private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
- ReadBuilder(MessageType type) {
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
this.type = type;
+ this.idToConstant = idToConstant;
}
@Override
@@ -146,13 +155,19 @@ public class SparkParquetReaders {
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
- ParquetValueReader<?> reader = readersById.get(id);
- if (reader != null) {
- reorderedFields.add(reader);
- types.add(typesById.get(id));
- } else {
- reorderedFields.add(ParquetValueReaders.nulls());
+ if (idToConstant.containsKey(id)) {
+ // containsKey is used because the constant may be null
+ reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
types.add(null);
+ } else {
+ ParquetValueReader<?> reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(ParquetValueReaders.nulls());
+ types.add(null);
+ }
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
index b799fe8..7408ca2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
@@ -29,14 +29,11 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.avro.generic.GenericData;
import org.apache.avro.io.Decoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
@@ -287,30 +284,5 @@ public class SparkValueReaders {
struct.setNullAt(pos);
}
}
-
- @Override
- protected Object prepareConstant(Type type, Object value) {
- switch (type.typeId()) {
- case DECIMAL:
- return Decimal.apply((BigDecimal) value);
- case STRING:
- if (value instanceof Utf8) {
- Utf8 utf8 = (Utf8) value;
- return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
- }
- return UTF8String.fromString(value.toString());
- case FIXED:
- if (value instanceof byte[]) {
- return value;
- } else if (value instanceof GenericData.Fixed) {
- return ((GenericData.Fixed) value).bytes();
- }
- return ByteBuffers.toByteArray((ByteBuffer) value);
- case BINARY:
- return ByteBuffers.toByteArray((ByteBuffer) value);
- default:
- }
- return value;
- }
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 61c6fa2..ec9aa70 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -24,10 +24,14 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
@@ -47,8 +51,10 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -56,11 +62,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.collection.JavaConverters;
class RowDataReader extends BaseDataReader<InternalRow> {
- private static final Set<FileFormat> SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO);
+ private static final Set<FileFormat> SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO, FileFormat.PARQUET);
// for some reason, the apply method can't be called from Java without reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
@@ -103,7 +111,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
if (hasJoinedPartitionColumns) {
if (SUPPORTS_CONSTANTS.contains(file.format())) {
iterSchema = requiredSchema;
- iter = open(task, requiredSchema, PartitionUtil.constantsMap(task));
+ iter = open(task, requiredSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant));
} else {
// schema used to read data files
Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
@@ -144,7 +152,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
switch (task.file().format()) {
case PARQUET:
- iter = newParquetIterable(location, task, readSchema);
+ iter = newParquetIterable(location, task, readSchema, idToConstant);
break;
case AVRO:
@@ -182,11 +190,12 @@ class RowDataReader extends BaseDataReader<InternalRow> {
private CloseableIterable<InternalRow> newParquetIterable(
InputFile location,
FileScanTask task,
- Schema readSchema) {
+ Schema readSchema,
+ Map<Integer, ?> idToConstant) {
return Parquet.read(location)
.project(readSchema)
.split(task.start(), task.length())
- .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
+ .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
.filter(task.residual())
.caseSensitive(caseSensitive)
.build();
@@ -233,4 +242,32 @@ class RowDataReader extends BaseDataReader<InternalRow> {
JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
}
+
+ private static Object convertConstant(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (type.typeId()) {
+ case DECIMAL:
+ return Decimal.apply((BigDecimal) value);
+ case STRING:
+ if (value instanceof Utf8) {
+ Utf8 utf8 = (Utf8) value;
+ return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
+ }
+ return UTF8String.fromString(value.toString());
+ case FIXED:
+ if (value instanceof byte[]) {
+ return value;
+ } else if (value instanceof GenericData.Fixed) {
+ return ((GenericData.Fixed) value).bytes();
+ }
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ case BINARY:
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ default:
+ }
+ return value;
+ }
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index 059d744..e6c7621 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -307,4 +308,72 @@ public class TestPartitionValues {
TestTables.clearTables();
}
}
+
+ @Test
+ public void testNestedPartitionValues() throws Exception {
+ Assume.assumeTrue("ORC can't project nested partition values", !format.equalsIgnoreCase("orc"));
+
+ String[] columnNames = new String[] {
+ "b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10"
+ };
+
+ HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
+ Schema nestedSchema = new Schema(optional(1, "nested", SUPPORTED_PRIMITIVES.asStruct()));
+
+ // create a table around the source data
+ String sourceLocation = temp.newFolder("source_table").toString();
+ Table source = tables.create(nestedSchema, sourceLocation);
+
+ // write out an Avro data file with all of the data types for source data
+ List<GenericData.Record> expected = RandomData.generateList(source.schema(), 2, 128735L);
+ File avroData = temp.newFile("data.avro");
+ Assert.assertTrue(avroData.delete());
+ try (FileAppender<GenericData.Record> appender = Avro.write(Files.localOutput(avroData))
+ .schema(source.schema())
+ .build()) {
+ appender.addAll(expected);
+ }
+
+ // add the Avro data file to the source table
+ source.newAppend()
+ .appendFile(DataFiles.fromInputFile(Files.localInput(avroData), 10))
+ .commit();
+
+ Dataset<Row> sourceDF = spark.read().format("iceberg").load(sourceLocation);
+
+ try {
+ for (String column : columnNames) {
+ String desc = "partition_by_" + SUPPORTED_PRIMITIVES.findType(column).toString();
+
+ File parent = temp.newFolder(desc);
+ File location = new File(parent, "test");
+ File dataFolder = new File(location, "data");
+ Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+ PartitionSpec spec = PartitionSpec.builderFor(nestedSchema).identity("nested." + column).build();
+
+ Table table = tables.create(nestedSchema, spec, location.toString());
+ table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
+
+ sourceDF.write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(location.toString())
+ .collectAsList();
+
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+
+ for (int i = 0; i < expected.size(); i += 1) {
+ TestHelpers.assertEqualsSafe(
+ nestedSchema.asStruct(), expected.get(i), actual.get(i));
+ }
+ }
+ } finally {
+ TestTables.clearTables();
+ }
+ }
}