You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/04/11 15:51:42 UTC

[incubator-iceberg] branch master updated: Parquet: Support constant map for partition values (#909)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a35c07  Parquet: Support constant map for partition values (#909)
3a35c07 is described below

commit 3a35c0764a51008fa5c5ecea109c87f92106dbcf
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Sat Apr 11 08:51:33 2020 -0700

    Parquet: Support constant map for partition values (#909)
    
    This is a follow-up to #896, which added the same constant map support for Avro.
    
    Fixes #575 for Parquet and replaces #585. Andrei did a lot of the work for this in #585.
    
    Co-authored-by: Andrei Ionescu <we...@gmail.com>
---
 .../java/org/apache/iceberg/avro/ValueReaders.java | 10 +---
 .../org/apache/iceberg/util/PartitionUtil.java     | 16 ++++-
 .../java/org/apache/iceberg/data/DateTimeUtil.java | 52 ++++++++++++++++
 .../org/apache/iceberg/data/TableScanIterable.java | 38 +++++++++++-
 .../apache/iceberg/data/avro/GenericReaders.java   | 15 ++---
 .../data/parquet/GenericParquetReaders.java        | 38 ++++++++----
 .../iceberg/spark/data/SparkParquetReaders.java    | 39 ++++++++----
 .../iceberg/spark/data/SparkValueReaders.java      | 28 ---------
 .../apache/iceberg/spark/source/RowDataReader.java | 47 +++++++++++++--
 .../iceberg/spark/source/TestPartitionValues.java  | 69 ++++++++++++++++++++++
 10 files changed, 272 insertions(+), 80 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index cbbf77c..7d63509 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -40,7 +40,6 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
 import static java.util.Collections.emptyIterator;
@@ -580,10 +579,9 @@ public class ValueReaders {
       List<Object> constantList = Lists.newArrayListWithCapacity(fields.size());
       for (int pos = 0; pos < fields.size(); pos += 1) {
         Types.NestedField field = fields.get(pos);
-        Object constant = idToConstant.get(field.fieldId());
-        if (constant != null) {
+        if (idToConstant.containsKey(field.fieldId())) {
           positionList.add(pos);
-          constantList.add(prepareConstant(field.type(), constant));
+          constantList.add(idToConstant.get(field.fieldId()));
         }
       }
 
@@ -597,10 +595,6 @@ public class ValueReaders {
 
     protected abstract void set(S struct, int pos, Object value);
 
-    protected Object prepareConstant(Type type, Object value) {
-      return value;
-    }
-
     public ValueReader<?> reader(int pos) {
       return readers[pos];
     }
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index 9a2aa99..1ef67db 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -22,26 +22,36 @@ package org.apache.iceberg.util;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 
 public class PartitionUtil {
   private PartitionUtil() {
   }
 
   public static Map<Integer, ?> constantsMap(FileScanTask task) {
-    return constantsMap(task.spec(), task.file().partition());
+    return constantsMap(task, (type, constant) -> constant);
   }
 
-  private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData) {
+  public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
+    return constantsMap(task.spec(), task.file().partition(), convertConstant);
+  }
+
+  private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData,
+                                              BiFunction<Type, Object, Object> convertConstant) {
     // use java.util.HashMap because partition data may contain null values
     Map<Integer, Object> idToConstant = new HashMap<>();
+    List<Types.NestedField> partitionFields = spec.partitionType().fields();
     List<PartitionField> fields = spec.fields();
     for (int pos = 0; pos < fields.size(); pos += 1) {
       PartitionField field = fields.get(pos);
-      idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class));
+      Object converted = convertConstant.apply(partitionFields.get(pos).type(), partitionData.get(pos, Object.class));
+      idToConstant.put(field.sourceId(), converted);
     }
     return idToConstant;
   }
diff --git a/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java b/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java
new file mode 100644
index 0000000..d6ab178
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+
+public class DateTimeUtil {
+  private DateTimeUtil() {
+  }
+
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  public static LocalDate dateFromDays(int daysFromEpoch) {
+    return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
+  }
+
+  public static LocalTime timeFromMicros(long microFromMidnight) {
+    return LocalTime.ofNanoOfDay(microFromMidnight * 1000);
+  }
+
+  public static LocalDateTime timestampFromMicros(long microsFromEpoch) {
+    return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime();
+  }
+
+  public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
+    return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index baa2320..a683c01 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HasTableOperations;
@@ -46,6 +47,8 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PartitionUtil;
 
 class TableScanIterable extends CloseableGroup implements CloseableIterable<Record> {
@@ -76,7 +79,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
 
   private CloseableIterable<Record> open(FileScanTask task) {
     InputFile input = ops.io().newInputFile(task.file().path().toString());
-    Map<Integer, ?> partition = PartitionUtil.constantsMap(task);
+    Map<Integer, ?> partition = PartitionUtil.constantsMap(task, TableScanIterable::convertConstant);
 
     // TODO: join to partition data from the manifest file
     switch (task.file().format()) {
@@ -96,7 +99,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
       case PARQUET:
         Parquet.ReadBuilder parquet = Parquet.read(input)
             .project(projection)
-            .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema))
+            .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema, partition))
             .split(task.start(), task.length());
 
         if (reuseContainers) {
@@ -185,4 +188,35 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
       }
     }
   }
+
+  /**
+   * Conversions from generic Avro values to Iceberg generic values.
+   */
+  private static Object convertConstant(Type type, Object value) {
+    if (value == null) {
+      return null;
+    }
+
+    switch (type.typeId()) {
+      case STRING:
+        return value.toString();
+      case TIME:
+        return DateTimeUtil.timeFromMicros((Long) value);
+      case DATE:
+        return DateTimeUtil.dateFromDays((Integer) value);
+      case TIMESTAMP:
+        if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+          return DateTimeUtil.timestamptzFromMicros((Long) value);
+        } else {
+          return DateTimeUtil.timestampFromMicros((Long) value);
+        }
+      case FIXED:
+        if (value instanceof GenericData.Fixed) {
+          return ((GenericData.Fixed) value).bytes();
+        }
+        return value;
+      default:
+    }
+    return value;
+  }
 }
diff --git a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
index 7502d15..df86672 100644
--- a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
@@ -20,18 +20,16 @@
 package org.apache.iceberg.data.avro;
 
 import java.io.IOException;
-import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
 import org.apache.avro.io.Decoder;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
+import org.apache.iceberg.data.DateTimeUtil;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Types.StructType;
@@ -60,9 +58,6 @@ class GenericReaders {
     return new GenericRecordReader(readers, struct, idToConstant);
   }
 
-  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
   private static class DateReader implements ValueReader<LocalDate> {
     private static final DateReader INSTANCE = new DateReader();
 
@@ -71,7 +66,7 @@ class GenericReaders {
 
     @Override
     public LocalDate read(Decoder decoder, Object reuse) throws IOException {
-      return EPOCH_DAY.plusDays(decoder.readInt());
+      return DateTimeUtil.dateFromDays(decoder.readInt());
     }
   }
 
@@ -83,7 +78,7 @@ class GenericReaders {
 
     @Override
     public LocalTime read(Decoder decoder, Object reuse) throws IOException {
-      return LocalTime.ofNanoOfDay(decoder.readLong() * 1000);
+      return DateTimeUtil.timeFromMicros(decoder.readLong());
     }
   }
 
@@ -95,7 +90,7 @@ class GenericReaders {
 
     @Override
     public LocalDateTime read(Decoder decoder, Object reuse) throws IOException {
-      return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS).toLocalDateTime();
+      return DateTimeUtil.timestampFromMicros(decoder.readLong());
     }
   }
 
@@ -107,7 +102,7 @@ class GenericReaders {
 
     @Override
     public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
-      return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS);
+      return DateTimeUtil.timestamptzFromMicros(decoder.readLong());
     }
   }
 
diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
index 02b3bcc..bc6767f 100644
--- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.data.parquet;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.time.Instant;
@@ -65,23 +66,28 @@ public class GenericParquetReaders {
   private GenericParquetReaders() {
   }
 
-  @SuppressWarnings("unchecked")
   public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
                                                               MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
+                                                              MessageType fileSchema,
+                                                              Map<Integer, ?> idToConstant) {
     if (ParquetSchemaUtil.hasIds(fileSchema)) {
       return (ParquetValueReader<GenericRecord>)
           TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new ReadBuilder(fileSchema));
+              new ReadBuilder(fileSchema, idToConstant));
     } else {
       return (ParquetValueReader<GenericRecord>)
           TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new FallbackReadBuilder(fileSchema));
+              new FallbackReadBuilder(fileSchema, idToConstant));
     }
   }
 
   private static class FallbackReadBuilder extends ReadBuilder {
-    FallbackReadBuilder(MessageType type) {
-      super(type);
+    FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      super(type, idToConstant);
     }
 
     @Override
@@ -112,9 +118,11 @@ public class GenericParquetReaders {
 
   private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
     private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
 
-    ReadBuilder(MessageType type) {
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
       this.type = type;
+      this.idToConstant = idToConstant;
     }
 
     @Override
@@ -145,13 +153,19 @@ public class GenericParquetReaders {
       List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
-        ParquetValueReader<?> reader = readersById.get(id);
-        if (reader != null) {
-          reorderedFields.add(reader);
-          types.add(typesById.get(id));
-        } else {
-          reorderedFields.add(ParquetValueReaders.nulls());
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
           types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
         }
       }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 9a36266..190b708 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.spark.data;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.math.BigDecimal;
@@ -66,23 +67,29 @@ public class SparkParquetReaders {
   private SparkParquetReaders() {
   }
 
-  @SuppressWarnings("unchecked")
   public static ParquetValueReader<InternalRow> buildReader(Schema expectedSchema,
                                                             MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
+
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<InternalRow> buildReader(Schema expectedSchema,
+                                                            MessageType fileSchema,
+                                                            Map<Integer, ?> idToConstant) {
     if (ParquetSchemaUtil.hasIds(fileSchema)) {
       return (ParquetValueReader<InternalRow>)
           TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new ReadBuilder(fileSchema));
+              new ReadBuilder(fileSchema, idToConstant));
     } else {
       return (ParquetValueReader<InternalRow>)
           TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new FallbackReadBuilder(fileSchema));
+              new FallbackReadBuilder(fileSchema, idToConstant));
     }
   }
 
   private static class FallbackReadBuilder extends ReadBuilder {
-    FallbackReadBuilder(MessageType type) {
-      super(type);
+    FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      super(type, idToConstant);
     }
 
     @Override
@@ -113,9 +120,11 @@ public class SparkParquetReaders {
 
   private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
     private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
 
-    ReadBuilder(MessageType type) {
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
       this.type = type;
+      this.idToConstant = idToConstant;
     }
 
     @Override
@@ -146,13 +155,19 @@ public class SparkParquetReaders {
       List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
-        ParquetValueReader<?> reader = readersById.get(id);
-        if (reader != null) {
-          reorderedFields.add(reader);
-          types.add(typesById.get(id));
-        } else {
-          reorderedFields.add(ParquetValueReaders.nulls());
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
           types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
         }
       }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
index b799fe8..7408ca2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
@@ -29,14 +29,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
@@ -287,30 +284,5 @@ public class SparkValueReaders {
         struct.setNullAt(pos);
       }
     }
-
-    @Override
-    protected Object prepareConstant(Type type, Object value) {
-      switch (type.typeId()) {
-        case DECIMAL:
-          return Decimal.apply((BigDecimal) value);
-        case STRING:
-          if (value instanceof Utf8) {
-            Utf8 utf8 = (Utf8) value;
-            return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
-          }
-          return UTF8String.fromString(value.toString());
-        case FIXED:
-          if (value instanceof byte[]) {
-            return value;
-          } else if (value instanceof GenericData.Fixed) {
-            return ((GenericData.Fixed) value).bytes();
-          }
-          return ByteBuffers.toByteArray((ByteBuffer) value);
-        case BINARY:
-          return ByteBuffers.toByteArray((ByteBuffer) value);
-        default:
-      }
-      return value;
-    }
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 61c6fa2..ec9aa70 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -24,10 +24,14 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataTask;
@@ -47,8 +51,10 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkOrcReader;
 import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -56,11 +62,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.collection.JavaConverters;
 
 class RowDataReader extends BaseDataReader<InternalRow> {
-  private static final Set<FileFormat> SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO);
+  private static final Set<FileFormat> SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO, FileFormat.PARQUET);
   // for some reason, the apply method can't be called from Java without reflection
   private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
       .impl(UnsafeProjection.class, InternalRow.class)
@@ -103,7 +111,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
     if (hasJoinedPartitionColumns) {
       if (SUPPORTS_CONSTANTS.contains(file.format())) {
         iterSchema = requiredSchema;
-        iter = open(task, requiredSchema, PartitionUtil.constantsMap(task));
+        iter = open(task, requiredSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant));
       } else {
         // schema used to read data files
         Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
@@ -144,7 +152,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
 
       switch (task.file().format()) {
         case PARQUET:
-          iter = newParquetIterable(location, task, readSchema);
+          iter = newParquetIterable(location, task, readSchema, idToConstant);
           break;
 
         case AVRO:
@@ -182,11 +190,12 @@ class RowDataReader extends BaseDataReader<InternalRow> {
   private CloseableIterable<InternalRow> newParquetIterable(
       InputFile location,
       FileScanTask task,
-      Schema readSchema) {
+      Schema readSchema,
+      Map<Integer, ?> idToConstant) {
     return Parquet.read(location)
         .project(readSchema)
         .split(task.start(), task.length())
-        .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
+        .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
         .filter(task.residual())
         .caseSensitive(caseSensitive)
         .build();
@@ -233,4 +242,32 @@ class RowDataReader extends BaseDataReader<InternalRow> {
         JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
         JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
   }
+
+  private static Object convertConstant(Type type, Object value) {
+    if (value == null) {
+      return null;
+    }
+
+    switch (type.typeId()) {
+      case DECIMAL:
+        return Decimal.apply((BigDecimal) value);
+      case STRING:
+        if (value instanceof Utf8) {
+          Utf8 utf8 = (Utf8) value;
+          return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
+        }
+        return UTF8String.fromString(value.toString());
+      case FIXED:
+        if (value instanceof byte[]) {
+          return value;
+        } else if (value instanceof GenericData.Fixed) {
+          return ((GenericData.Fixed) value).bytes();
+        }
+        return ByteBuffers.toByteArray((ByteBuffer) value);
+      case BINARY:
+        return ByteBuffers.toByteArray((ByteBuffer) value);
+      default:
+    }
+    return value;
+  }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index 059d744..e6c7621 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -307,4 +308,72 @@ public class TestPartitionValues {
       TestTables.clearTables();
     }
   }
+
+  @Test
+  public void testNestedPartitionValues() throws Exception {
+    Assume.assumeTrue("ORC can't project nested partition values", !format.equalsIgnoreCase("orc"));
+
+    String[] columnNames = new String[] {
+        "b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10"
+    };
+
+    HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
+    Schema nestedSchema = new Schema(optional(1, "nested", SUPPORTED_PRIMITIVES.asStruct()));
+
+    // create a table around the source data
+    String sourceLocation = temp.newFolder("source_table").toString();
+    Table source = tables.create(nestedSchema, sourceLocation);
+
+    // write out an Avro data file with all of the data types for source data
+    List<GenericData.Record> expected = RandomData.generateList(source.schema(), 2, 128735L);
+    File avroData = temp.newFile("data.avro");
+    Assert.assertTrue(avroData.delete());
+    try (FileAppender<GenericData.Record> appender = Avro.write(Files.localOutput(avroData))
+        .schema(source.schema())
+        .build()) {
+      appender.addAll(expected);
+    }
+
+    // add the Avro data file to the source table
+    source.newAppend()
+        .appendFile(DataFiles.fromInputFile(Files.localInput(avroData), 10))
+        .commit();
+
+    Dataset<Row> sourceDF = spark.read().format("iceberg").load(sourceLocation);
+
+    try {
+      for (String column : columnNames) {
+        String desc = "partition_by_" + SUPPORTED_PRIMITIVES.findType(column).toString();
+
+        File parent = temp.newFolder(desc);
+        File location = new File(parent, "test");
+        File dataFolder = new File(location, "data");
+        Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+        PartitionSpec spec = PartitionSpec.builderFor(nestedSchema).identity("nested." + column).build();
+
+        Table table = tables.create(nestedSchema, spec, location.toString());
+        table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
+
+        sourceDF.write()
+            .format("iceberg")
+            .mode("append")
+            .save(location.toString());
+
+        List<Row> actual = spark.read()
+            .format("iceberg")
+            .load(location.toString())
+            .collectAsList();
+
+        Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+
+        for (int i = 0; i < expected.size(); i += 1) {
+          TestHelpers.assertEqualsSafe(
+              nestedSchema.asStruct(), expected.get(i), actual.get(i));
+        }
+      }
+    } finally {
+      TestTables.clearTables();
+    }
+  }
 }