You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/26 20:11:17 UTC

[incubator-iceberg] branch vectorized-read updated: Remove all formatting only and any other unrelated diffs between master and vectorized-read branches (#673)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/vectorized-read by this push:
     new b6ca634  Remove all formatting only and any other unrelated diffs between master and vectorized-read branches (#673)
b6ca634 is described below

commit b6ca634aff56b1917c2cf6a7eca0d3ed5a200ed0
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Tue Nov 26 12:11:10 2019 -0800

    Remove all formatting only and any other unrelated diffs between master and vectorized-read branches (#673)
---
 README.md                                          |   3 -
 .../java/org/apache/iceberg/expressions/False.java |   2 +-
 .../java/org/apache/iceberg/expressions/True.java  |   2 +-
 .../apache/iceberg/transforms/ProjectionUtil.java  |   4 +-
 .../org/apache/iceberg/transforms/Truncate.java    |  12 +-
 .../iceberg/transforms/TestDatesProjection.java    |   8 +-
 .../transforms/TestTimestampsProjection.java       |   8 +-
 .../main/java/org/apache/iceberg/SchemaUpdate.java |  10 +-
 .../java/org/apache/iceberg/TableMetadata.java     |  42 ++--
 .../org/apache/iceberg/hadoop/HadoopTables.java    |   2 +-
 gradle/wrapper/gradle-wrapper.properties           |  24 ++-
 jitpack.yml                                        |   2 +-
 .../java/org/apache/iceberg/parquet/Parquet.java   |   4 +-
 .../org/apache/iceberg/parquet/ParquetFilters.java |  38 ++--
 .../java/org/apache/iceberg/parquet/ParquetIO.java |   6 +-
 .../apache/iceberg/parquet/ParquetIterable.java    |   2 +-
 .../apache/iceberg/parquet/ParquetReadSupport.java |   4 +-
 .../org/apache/iceberg/parquet/ParquetReader.java  |  10 +-
 .../org/apache/iceberg/parquet/ParquetUtil.java    |  18 +-
 .../iceberg/parquet/ParquetValueReaders.java       |  10 +-
 .../iceberg/parquet/ParquetWriteSupport.java       |   4 +-
 .../SparkParquetReadersFlatDataBenchmark.java      | 215 +++++++++++++++++++++
 .../SparkParquetReadersNestedDataBenchmark.java    | 215 +++++++++++++++++++++
 .../SparkParquetWritersFlatDataBenchmark.java      | 123 ++++++++++++
 .../SparkParquetWritersNestedDataBenchmark.java    | 122 ++++++++++++
 .../spark/source/IcebergSourceBenchmark.java       |   4 +-
 .../source/IcebergSourceFlatDataBenchmark.java     |  18 +-
 ...cebergSourceFlatParquetDataFilterBenchmark.java |  47 +----
 ...IcebergSourceFlatParquetDataReadBenchmark.java} |  70 +++----
 ...IcebergSourceFlatParquetDataWriteBenchmark.java |  16 +-
 .../iceberg/spark/data/SparkParquetReaders.java    |   6 +-
 .../apache/iceberg/spark/source/IcebergSource.java |   4 +-
 .../org/apache/iceberg/spark/source/Reader.java    |  20 +-
 .../org/apache/iceberg/spark/source/Writer.java    |   6 +-
 .../org/apache/iceberg/spark/SparkTableUtil.scala  | 112 +++++------
 .../org/apache/iceberg/spark/data/RandomData.java  |   5 +-
 .../org/apache/iceberg/spark/data/TestHelpers.java |   3 +-
 .../iceberg/spark/data/TestParquetAvroReader.java  |  10 +-
 .../iceberg/spark/data/TestParquetAvroWriter.java  |  70 ++++---
 .../iceberg/spark/data/TestSparkParquetReader.java |  67 ++++---
 .../iceberg/spark/data/TestSparkParquetWriter.java |  66 ++++---
 .../iceberg/spark/source/TestSparkTableUtil.java   |  66 +++----
 42 files changed, 1071 insertions(+), 409 deletions(-)

diff --git a/README.md b/README.md
index a518a2e..68f8bf4 100644
--- a/README.md
+++ b/README.md
@@ -57,9 +57,6 @@ Iceberg is built using Gradle 5.4.1.
 * To invoke a build and run tests: `./gradlew build`
 * To skip tests: `./gradlew build -x test`
 
-* To invoke a build and run tests: `./gradlew build`
-* To skip tests: `./gradlew build -x test`
-
 Iceberg table support is organized in library modules:
 
 * `iceberg-common` contains utility classes used in other modules
diff --git a/api/src/main/java/org/apache/iceberg/expressions/False.java b/api/src/main/java/org/apache/iceberg/expressions/False.java
index 761d054..940750a 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/False.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/False.java
@@ -25,7 +25,7 @@ import java.io.ObjectStreamException;
  * An {@link Expression expression} that is always false.
  */
 public class False implements Expression {
-  public static final False INSTANCE = new False();
+  static final False INSTANCE = new False();
 
   private False() {
   }
diff --git a/api/src/main/java/org/apache/iceberg/expressions/True.java b/api/src/main/java/org/apache/iceberg/expressions/True.java
index 634e2c0..8f43bc0 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/True.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/True.java
@@ -25,7 +25,7 @@ import java.io.ObjectStreamException;
  * An {@link Expression expression} that is always true.
  */
 public class True implements Expression {
-  public static final True INSTANCE = new True();
+  static final True INSTANCE = new True();
 
   private True() {
   }
diff --git a/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java b/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
index 42b5b47..9611f56 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
@@ -191,8 +191,8 @@ class ProjectionUtil {
         return predicate(Expression.Operation.EQ, name, transform.apply(boundary));
       case STARTS_WITH:
         return predicate(Expression.Operation.STARTS_WITH, name, transform.apply(boundary));
-      //        case IN: // TODO
-      //          return Expressions.predicate(Operation.IN, name, transform.apply(boundary));
+//        case IN: // TODO
+//          return Expressions.predicate(Operation.IN, name, transform.apply(boundary));
       default:
         return null;
     }
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
index dc3ac6a..f9fd60b 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
@@ -231,7 +231,7 @@ abstract class Truncate<T> implements Transform<T, T> {
 
     @Override
     public UnboundPredicate<CharSequence> project(String name,
-        BoundPredicate<CharSequence> predicate) {
+                                                  BoundPredicate<CharSequence> predicate) {
       if (predicate.isUnaryPredicate()) {
         return Expressions.predicate(predicate.op(), name);
       } else if (predicate.isLiteralPredicate()) {
@@ -242,7 +242,7 @@ abstract class Truncate<T> implements Transform<T, T> {
 
     @Override
     public UnboundPredicate<CharSequence> projectStrict(String name,
-        BoundPredicate<CharSequence> predicate) {
+                                                        BoundPredicate<CharSequence> predicate) {
       if (predicate instanceof BoundUnaryPredicate) {
         return Expressions.predicate(predicate.op(), name);
       } else if (predicate instanceof BoundLiteralPredicate) {
@@ -313,7 +313,7 @@ abstract class Truncate<T> implements Transform<T, T> {
 
     @Override
     public UnboundPredicate<ByteBuffer> project(String name,
-        BoundPredicate<ByteBuffer> pred) {
+                                                BoundPredicate<ByteBuffer> pred) {
       if (pred.isUnaryPredicate()) {
         return Expressions.predicate(pred.op(), name);
       } else if (pred.isLiteralPredicate()) {
@@ -324,7 +324,7 @@ abstract class Truncate<T> implements Transform<T, T> {
 
     @Override
     public UnboundPredicate<ByteBuffer> projectStrict(String name,
-        BoundPredicate<ByteBuffer> pred) {
+                                                      BoundPredicate<ByteBuffer> pred) {
       if (pred.isUnaryPredicate()) {
         return Expressions.predicate(pred.op(), name);
       } else if (pred.isLiteralPredicate()) {
@@ -396,7 +396,7 @@ abstract class Truncate<T> implements Transform<T, T> {
 
     @Override
     public UnboundPredicate<BigDecimal> project(String name,
-        BoundPredicate<BigDecimal> pred) {
+                                                BoundPredicate<BigDecimal> pred) {
       if (pred.isUnaryPredicate()) {
         return Expressions.predicate(pred.op(), name);
       } else if (pred.isLiteralPredicate()) {
@@ -407,7 +407,7 @@ abstract class Truncate<T> implements Transform<T, T> {
 
     @Override
     public UnboundPredicate<BigDecimal> projectStrict(String name,
-        BoundPredicate<BigDecimal> pred) {
+                                                      BoundPredicate<BigDecimal> pred) {
       if (pred.isUnaryPredicate()) {
         return Expressions.predicate(pred.op(), name);
       } else if (pred.isLiteralPredicate()) {
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java b/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
index 6f263cc..b6602a6 100644
--- a/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
@@ -43,7 +43,7 @@ public class TestDatesProjection {
   private static final Schema SCHEMA = new Schema(optional(1, "date", TYPE));
 
   public void assertProjectionStrict(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp, String expectedLiteral) {
+                                     Expression.Operation expectedOp, String expectedLiteral) {
 
     Expression projection = Projections.strict(spec).project(filter);
     UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
@@ -57,21 +57,21 @@ public class TestDatesProjection {
   }
 
   public void assertProjectionStrictValue(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp) {
+                                          Expression.Operation expectedOp) {
 
     Expression projection = Projections.strict(spec).project(filter);
     Assert.assertEquals(projection.op(), expectedOp);
   }
 
   public void assertProjectionInclusiveValue(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp) {
+                                             Expression.Operation expectedOp) {
 
     Expression projection = Projections.inclusive(spec).project(filter);
     Assert.assertEquals(projection.op(), expectedOp);
   }
 
   public void assertProjectionInclusive(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp, String expectedLiteral) {
+                                        Expression.Operation expectedOp, String expectedLiteral) {
     Expression projection = Projections.inclusive(spec).project(filter);
     UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
 
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java b/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
index 787ec1e..3fec404 100644
--- a/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
@@ -43,7 +43,7 @@ public class TestTimestampsProjection {
   private static final Schema SCHEMA = new Schema(optional(1, "timestamp", TYPE));
 
   public void assertProjectionStrict(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp, String expectedLiteral) {
+                                     Expression.Operation expectedOp, String expectedLiteral) {
 
     Expression projection = Projections.strict(spec).project(filter);
     UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
@@ -57,21 +57,21 @@ public class TestTimestampsProjection {
   }
 
   public void assertProjectionStrictValue(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp) {
+                                          Expression.Operation expectedOp) {
 
     Expression projection = Projections.strict(spec).project(filter);
     Assert.assertEquals(projection.op(), expectedOp);
   }
 
   public void assertProjectionInclusiveValue(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp) {
+                                             Expression.Operation expectedOp) {
 
     Expression projection = Projections.inclusive(spec).project(filter);
     Assert.assertEquals(projection.op(), expectedOp);
   }
 
   public void assertProjectionInclusive(PartitionSpec spec, UnboundPredicate<?> filter,
-      Expression.Operation expectedOp, String expectedLiteral) {
+                                        Expression.Operation expectedOp, String expectedLiteral) {
     Expression projection = Projections.inclusive(spec).project(filter);
     UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
 
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index 3e53598..a1ef08c 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -309,8 +309,8 @@ class SchemaUpdate implements UpdateSchema {
   }
 
   private static Schema applyChanges(Schema schema, List<Integer> deletes,
-      Map<Integer, Types.NestedField> updates,
-      Multimap<Integer, Types.NestedField> adds) {
+                                     Map<Integer, Types.NestedField> updates,
+                                     Multimap<Integer, Types.NestedField> adds) {
     Types.StructType struct = TypeUtil
         .visit(schema, new ApplyChanges(deletes, updates, adds))
         .asNestedType().asStructType();
@@ -323,8 +323,8 @@ class SchemaUpdate implements UpdateSchema {
     private final Multimap<Integer, Types.NestedField> adds;
 
     private ApplyChanges(List<Integer> deletes,
-        Map<Integer, Types.NestedField> updates,
-        Multimap<Integer, Types.NestedField> adds) {
+                        Map<Integer, Types.NestedField> updates,
+                        Multimap<Integer, Types.NestedField> adds) {
       this.deletes = deletes;
       this.updates = updates;
       this.adds = adds;
@@ -471,7 +471,7 @@ class SchemaUpdate implements UpdateSchema {
   }
 
   private static Types.StructType addFields(Types.StructType struct,
-      Collection<Types.NestedField> adds) {
+                                            Collection<Types.NestedField> adds) {
     List<Types.NestedField> newFields = Lists.newArrayList(struct.fields());
     newFields.addAll(adds);
     return Types.StructType.of(newFields);
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 899341b..a5081a6 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -47,15 +47,15 @@ public class TableMetadata {
   static final int INITIAL_SPEC_ID = 0;
 
   public static TableMetadata newTableMetadata(Schema schema,
-      PartitionSpec spec,
-      String location) {
+                                               PartitionSpec spec,
+                                               String location) {
     return newTableMetadata(schema, spec, location, ImmutableMap.of());
   }
 
   public static TableMetadata newTableMetadata(Schema schema,
-      PartitionSpec spec,
-      String location,
-      Map<String, String> properties) {
+                                               PartitionSpec spec,
+                                               String location,
+                                               Map<String, String> properties) {
     // reassign all column ids to ensure consistency
     AtomicInteger lastColumnId = new AtomicInteger(0);
     Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
@@ -150,7 +150,7 @@ public class TableMetadata {
       }
       MetadataLogEntry that = (MetadataLogEntry) other;
       return timestampMillis == that.timestampMillis &&
-          java.util.Objects.equals(file, that.file);
+              java.util.Objects.equals(file, that.file);
     }
 
     @Override
@@ -186,18 +186,18 @@ public class TableMetadata {
   private final List<MetadataLogEntry> previousFiles;
 
   TableMetadata(InputFile file,
-      String uuid,
-      String location,
-      long lastUpdatedMillis,
-      int lastColumnId,
-      Schema schema,
-      int defaultSpecId,
-      List<PartitionSpec> specs,
-      Map<String, String> properties,
-      long currentSnapshotId,
-      List<Snapshot> snapshots,
-      List<HistoryEntry> snapshotLog,
-      List<MetadataLogEntry> previousFiles) {
+                String uuid,
+                String location,
+                long lastUpdatedMillis,
+                int lastColumnId,
+                Schema schema,
+                int defaultSpecId,
+                List<PartitionSpec> specs,
+                Map<String, String> properties,
+                long currentSnapshotId,
+                List<Snapshot> snapshots,
+                List<HistoryEntry> snapshotLog,
+                List<MetadataLogEntry> previousFiles) {
     this.file = file;
     this.uuid = uuid;
     this.location = location;
@@ -476,7 +476,7 @@ public class TableMetadata {
   }
 
   public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec,
-      Map<String, String> updatedProperties) {
+                                        Map<String, String> updatedProperties) {
     AtomicInteger nextLastColumnId = new AtomicInteger(0);
     Schema freshSchema = TypeUtil.assignFreshIds(updatedSchema, nextLastColumnId::incrementAndGet);
 
@@ -526,13 +526,13 @@ public class TableMetadata {
   }
 
   private List<MetadataLogEntry> addPreviousFile(InputFile previousFile, long timestampMillis,
-      Map<String, String> updatedProperties) {
+                                                 Map<String, String> updatedProperties) {
     if (previousFile == null) {
       return previousFiles;
     }
 
     int maxSize = Math.max(1, PropertyUtil.propertyAsInt(updatedProperties,
-        TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
+            TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
 
     List<MetadataLogEntry> newMetadataLog = null;
     if (previousFiles.size() >= maxSize) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
index cffd6fc..9b144ef 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@@ -126,7 +126,7 @@ public class HadoopTables implements Tables, Configurable {
    */
   @Override
   public Table create(Schema schema, PartitionSpec spec, Map<String, String> properties,
-      String location) {
+                      String location) {
     Preconditions.checkNotNull(schema, "A table schema is required");
 
     TableOperations ops = newTableOps(location);
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 768a708..1277f7d 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,24 @@
-#Wed Nov 20 14:10:01 PST 2019
-distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-zipStorePath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip
diff --git a/jitpack.yml b/jitpack.yml
index 5fabc87..01b9d61 100644
--- a/jitpack.yml
+++ b/jitpack.yml
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 install:
-  - ./gradlew publishToMavenLocal
\ No newline at end of file
+  - ./gradlew publishToMavenLocal
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 0e724df..60aa877 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -314,8 +314,8 @@ public class Parquet {
       return this;
     }
 
-    public ReadBuilder project(Schema sc) {
-      this.schema = sc;
+    public ReadBuilder project(Schema newSchema) {
+      this.schema = newSchema;
       return this;
     }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
index 285164e..ae40da7 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
@@ -176,25 +176,6 @@ class ParquetFilters {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private static <C extends Comparable<C>> C getParquetPrimitive(Literal<?> lit) {
-    if (lit == null) {
-      return null;
-    }
-
-    // TODO: this needs to convert to handle BigDecimal and UUID
-    Object value = lit.value();
-    if (value instanceof Number) {
-      return (C) lit.value();
-    } else if (value instanceof CharSequence) {
-      return (C) Binary.fromString(value.toString());
-    } else if (value instanceof ByteBuffer) {
-      return (C) Binary.fromReusedByteBuffer((ByteBuffer) value);
-    }
-    throw new UnsupportedOperationException(
-        "Type not supported yet: " + value.getClass().getName());
-  }
-
   @SuppressWarnings("checkstyle:MethodTypeParameterName")
   private static <C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
       FilterPredicate pred(Operation op, COL col, C value) {
@@ -220,6 +201,25 @@ class ParquetFilters {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  private static <C extends Comparable<C>> C getParquetPrimitive(Literal<?> lit) {
+    if (lit == null) {
+      return null;
+    }
+
+    // TODO: this needs to convert to handle BigDecimal and UUID
+    Object value = lit.value();
+    if (value instanceof Number) {
+      return (C) lit.value();
+    } else if (value instanceof CharSequence) {
+      return (C) Binary.fromString(value.toString());
+    } else if (value instanceof ByteBuffer) {
+      return (C) Binary.fromReusedByteBuffer((ByteBuffer) value);
+    }
+    throw new UnsupportedOperationException(
+        "Type not supported yet: " + value.getClass().getName());
+  }
+
   private static class AlwaysTrue implements FilterPredicate {
     static final AlwaysTrue INSTANCE = new AlwaysTrue();
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
index 1afc211..d65b8d6 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
@@ -41,11 +41,11 @@ import org.apache.parquet.io.SeekableInputStream;
 /**
  * Methods in this class translate from the IO API to Parquet's IO API.
  */
-public class ParquetIO {
+class ParquetIO {
   private ParquetIO() {
   }
 
-  public static InputFile file(org.apache.iceberg.io.InputFile file) {
+  static InputFile file(org.apache.iceberg.io.InputFile file) {
     // TODO: use reflection to avoid depending on classes from iceberg-hadoop
     // TODO: use reflection to avoid depending on classes from hadoop
     if (file instanceof HadoopInputFile) {
@@ -59,7 +59,7 @@ public class ParquetIO {
     return new ParquetInputFile(file);
   }
 
-  public static OutputFile file(org.apache.iceberg.io.OutputFile file) {
+  static OutputFile file(org.apache.iceberg.io.OutputFile file) {
     if (file instanceof HadoopOutputFile) {
       HadoopOutputFile hfile = (HadoopOutputFile) file;
       try {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
index 0000cb2..79d702e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
@@ -31,7 +31,7 @@ import org.apache.parquet.hadoop.ParquetReader;
 public class ParquetIterable<T> extends CloseableGroup implements CloseableIterable<T> {
   private final ParquetReader.Builder<T> builder;
 
-  public ParquetIterable(ParquetReader.Builder<T> builder) {
+  ParquetIterable(ParquetReader.Builder<T> builder) {
     this.builder = builder;
   }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
index 18e2c85..9588676 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
@@ -37,12 +37,12 @@ import org.apache.parquet.schema.MessageType;
  *
  * @param <T> Java type produced by this read support instance
  */
-public class ParquetReadSupport<T> extends ReadSupport<T> {
+class ParquetReadSupport<T> extends ReadSupport<T> {
   private final Schema expectedSchema;
   private final ReadSupport<T> wrapped;
   private final boolean callInit;
 
-  public ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, boolean callInit) {
+  ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, boolean callInit) {
     this.expectedSchema = expectedSchema;
     this.wrapped = readSupport;
     this.callInit = callInit;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 7112f8d..80f092d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -47,8 +47,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
   private final boolean caseSensitive;
 
   public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
-      Function<MessageType, ParquetValueReader<?>> readerFunc,
-      Expression filter, boolean reuseContainers, boolean caseSensitive) {
+                       Function<MessageType, ParquetValueReader<?>> readerFunc,
+                       Expression filter, boolean reuseContainers, boolean caseSensitive) {
     this.input = input;
     this.expectedSchema = expectedSchema;
     this.options = options;
@@ -72,8 +72,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
 
     @SuppressWarnings("unchecked")
     ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
-        Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
-        boolean caseSensitive) {
+             Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
+             boolean caseSensitive) {
       this.file = file;
       this.options = options;
       this.reader = newReader(file, options);
@@ -102,7 +102,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
         BlockMetaData rowGroup = rowGroups.get(i);
         boolean shouldRead = filter == null || (
             statsFilter.shouldRead(typeWithIds, rowGroup) &&
-                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+            dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
         this.shouldSkip[i] = !shouldRead;
         if (shouldRead) {
           computedTotalValues += rowGroup.getRowCount();
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 3f731de..2621f5f 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -95,7 +95,7 @@ public class ParquetUtil {
         increment(columnSizes, fieldId, column.getTotalSize());
 
         String columnName = fileSchema.findColumnName(fieldId);
-        MetricsModes.MetricsMode metricsMode = metricsConfig.columnMode(columnName);
+        MetricsMode metricsMode = metricsConfig.columnMode(columnName);
         if (metricsMode == MetricsModes.None.get()) {
           continue;
         }
@@ -134,7 +134,8 @@ public class ParquetUtil {
   }
 
   /**
-   * @return a list of offsets in ascending order determined by the starting position of the row groups
+   * @return a list of offsets in ascending order determined by the starting position
+   * of the row groups
    */
   public static List<Long> getSplitOffsets(ParquetMetadata md) {
     List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
@@ -172,9 +173,8 @@ public class ParquetUtil {
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> void updateMin(
-      Map<Integer, Literal<?>> lowerBounds, int id, Type type,
-      Literal<T> min, MetricsMode metricsMode) {
+  private static <T> void updateMin(Map<Integer, Literal<?>> lowerBounds, int id, Type type,
+                                    Literal<T> min, MetricsMode metricsMode) {
     Literal<T> currentMin = (Literal<T>) lowerBounds.get(id);
     if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) {
       if (metricsMode == MetricsModes.Full.get()) {
@@ -198,9 +198,8 @@ public class ParquetUtil {
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> void updateMax(
-      Map<Integer, Literal<?>> upperBounds, int id, Type type,
-      Literal<T> max, MetricsMode metricsMode) {
+  private static <T> void updateMax(Map<Integer, Literal<?>> upperBounds, int id, Type type,
+                                    Literal<T> max, MetricsMode metricsMode) {
     Literal<T> currentMax = (Literal<T>) upperBounds.get(id);
     if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) {
       if (metricsMode == MetricsModes.Full.get()) {
@@ -226,8 +225,7 @@ public class ParquetUtil {
   private static Map<Integer, ByteBuffer> toBufferMap(Schema schema, Map<Integer, Literal<?>> map) {
     Map<Integer, ByteBuffer> bufferMap = Maps.newHashMap();
     for (Map.Entry<Integer, Literal<?>> entry : map.entrySet()) {
-      bufferMap.put(
-          entry.getKey(),
+      bufferMap.put(entry.getKey(),
           Conversions.toByteBuffer(schema.findType(entry.getKey()), entry.getValue().value()));
     }
     return bufferMap;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index ba18f75..aa7f572 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -41,7 +41,7 @@ public class ParquetValueReaders {
   }
 
   public static <T> ParquetValueReader<T> option(Type type, int definitionLevel,
-      ParquetValueReader<T> reader) {
+                                                 ParquetValueReader<T> reader) {
     if (type.isRepetition(Type.Repetition.OPTIONAL)) {
       return new OptionReader<>(definitionLevel, reader);
     }
@@ -416,7 +416,7 @@ public class ParquetValueReaders {
     private Iterator<E> elements = null;
 
     public ListReader(int definitionLevel, int repetitionLevel,
-        ParquetValueReader<E> reader) {
+                      ParquetValueReader<E> reader) {
       super(definitionLevel, repetitionLevel, reader);
     }
 
@@ -470,7 +470,7 @@ public class ParquetValueReaders {
     private final List<TripleIterator<?>> children;
 
     protected RepeatedKeyValueReader(int definitionLevel, int repetitionLevel,
-        ParquetValueReader<K> keyReader, ParquetValueReader<V> valueReader) {
+                           ParquetValueReader<K> keyReader, ParquetValueReader<V> valueReader) {
       this.definitionLevel = definitionLevel;
       this.repetitionLevel = repetitionLevel;
       this.keyReader = keyReader;
@@ -535,8 +535,8 @@ public class ParquetValueReaders {
     private Iterator<Map.Entry<K, V>> pairs = null;
 
     public MapReader(int definitionLevel, int repetitionLevel,
-        ParquetValueReader<K> keyReader,
-        ParquetValueReader<V> valueReader) {
+                     ParquetValueReader<K> keyReader,
+                     ParquetValueReader<V> valueReader) {
       super(definitionLevel, repetitionLevel, keyReader, valueReader);
     }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
index d097c67..633f9f8 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
@@ -26,12 +26,12 @@ import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageType;
 
-public class ParquetWriteSupport<T> extends WriteSupport<T> {
+class ParquetWriteSupport<T> extends WriteSupport<T> {
   private final MessageType type;
   private final Map<String, String> keyValueMetadata;
   private final WriteSupport<T> wrapped;
 
-  public ParquetWriteSupport(MessageType type, Map<String, String> keyValueMetadata, WriteSupport<T> writeSupport) {
+  ParquetWriteSupport(MessageType type, Map<String, String> keyValueMetadata, WriteSupport<T> writeSupport) {
     this.type = type;
     this.keyValueMetadata = keyValueMetadata;
     this.wrapped = writeSupport;
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersFlatDataBenchmark.java
new file mode 100644
index 0000000..7ad24a7
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import com.google.common.collect.Iterables;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkBenchmarkUtil;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of reading Parquet data with a flat schema using
+ * Iceberg and Spark Parquet readers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetReadersFlatDataBenchmark
+ *       -PjmhOutputPath=benchmark/spark-parquet-readers-flat-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetReadersFlatDataBenchmark {
+
+  private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
+      .impl(UnsafeProjection.class, InternalRow.class)
+      .build();
+  private static final Schema SCHEMA = new Schema(
+      required(1, "longCol", Types.LongType.get()),
+      required(2, "intCol", Types.IntegerType.get()),
+      required(3, "floatCol", Types.FloatType.get()),
+      optional(4, "doubleCol", Types.DoubleType.get()),
+      optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+      optional(6, "dateCol", Types.DateType.get()),
+      optional(7, "timestampCol", Types.TimestampType.withZone()),
+      optional(8, "stringCol", Types.StringType.get()));
+  private static final Schema PROJECTED_SCHEMA = new Schema(
+      required(1, "longCol", Types.LongType.get()),
+      optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+      optional(8, "stringCol", Types.StringType.get()));
+  private static final int NUM_RECORDS = 10000000;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet");
+    List<GenericData.Record> records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L);
+    try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(dataFile))
+        .schema(SCHEMA)
+        .named("benchmark")
+        .build()) {
+      writer.addAll(records);
+    }
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackHole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingSparkReader(Blackhole blackhole) throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersNestedDataBenchmark.java
new file mode 100644
index 0000000..11cf45c
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import com.google.common.collect.Iterables;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkBenchmarkUtil;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of reading nested Parquet data using
+ * Iceberg and Spark Parquet readers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetReadersNestedDataBenchmark
+ *       -PjmhOutputPath=benchmark/spark-parquet-readers-nested-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetReadersNestedDataBenchmark {
+
+  private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
+      .impl(UnsafeProjection.class, InternalRow.class)
+      .build();
+  private static final Schema SCHEMA = new Schema(
+      required(0, "id", Types.LongType.get()),
+      optional(4, "nested", Types.StructType.of(
+          required(1, "col1", Types.StringType.get()),
+          required(2, "col2", Types.DoubleType.get()),
+          required(3, "col3", Types.LongType.get())
+      ))
+  );
+  private static final Schema PROJECTED_SCHEMA = new Schema(
+      optional(4, "nested", Types.StructType.of(
+          required(1, "col1", Types.StringType.get())
+      ))
+  );
+  private static final int NUM_RECORDS = 10000000;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet");
+    List<GenericData.Record> records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L);
+    try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(dataFile))
+        .schema(SCHEMA)
+        .named("benchmark")
+        .build()) {
+      writer.addAll(records);
+    }
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readUsingSparkReader(Blackhole blackhole) throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+        .build()) {
+
+      Iterable<InternalRow> unsafeRows = Iterables.transform(
+          rows,
+          APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke);
+
+      for (InternalRow row : unsafeRows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
+    try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+        .project(PROJECTED_SCHEMA)
+        .readSupport(new ParquetReadSupport())
+        .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .callInit()
+        .build()) {
+
+      for (InternalRow row : rows) {
+        blackhole.consume(row);
+      }
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersFlatDataBenchmark.java
new file mode 100644
index 0000000..599aa06
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of writing Parquet data with a flat schema using
+ * Iceberg and Spark Parquet writers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetWritersFlatDataBenchmark
+ *       -PjmhOutputPath=benchmark/spark-parquet-writers-flat-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetWritersFlatDataBenchmark {
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "longCol", Types.LongType.get()),
+      required(2, "intCol", Types.IntegerType.get()),
+      required(3, "floatCol", Types.FloatType.get()),
+      optional(4, "doubleCol", Types.DoubleType.get()),
+      optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+      optional(6, "dateCol", Types.DateType.get()),
+      optional(7, "timestampCol", Types.TimestampType.withZone()),
+      optional(8, "stringCol", Types.StringType.get()));
+  private static final int NUM_RECORDS = 1000000;
+  private Iterable<InternalRow> rows;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L);
+    dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet");
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingIcebergWriter() throws IOException {
+    try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+        .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType))
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingSparkWriter() throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+        .writeSupport(new ParquetWriteSupport())
+        .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+        .set("spark.sql.parquet.writeLegacyFormat", "false")
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersNestedDataBenchmark.java
new file mode 100644
index 0000000..c43a885
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data using
+ * Iceberg and Spark Parquet writers.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark:jmh
+ *       -PjmhIncludeRegex=SparkParquetWritersNestedDataBenchmark
+ *       -PjmhOutputPath=benchmark/spark-parquet-writers-nested-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetWritersNestedDataBenchmark {
+
+  private static final Schema SCHEMA = new Schema(
+      required(0, "id", Types.LongType.get()),
+      optional(4, "nested", Types.StructType.of(
+          required(1, "col1", Types.StringType.get()),
+          required(2, "col2", Types.DoubleType.get()),
+          required(3, "col3", Types.LongType.get())
+      ))
+  );
+  private static final int NUM_RECORDS = 1000000;
+  private Iterable<InternalRow> rows;
+  private File dataFile;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L);
+    dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet");
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    if (dataFile != null) {
+      dataFile.delete();
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingIcebergWriter() throws IOException {
+    try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+        .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType))
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUsingSparkWriter() throws IOException {
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+        .writeSupport(new ParquetWriteSupport())
+        .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+        .set("spark.sql.parquet.writeLegacyFormat", "false")
+        .set("spark.sql.parquet.binaryAsString", "false")
+        .set("spark.sql.parquet.int96AsTimestamp", "false")
+        .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+        .schema(SCHEMA)
+        .build()) {
+
+      writer.addAll(rows);
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index a1927d5..97cd591 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -48,8 +48,8 @@ import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION;
 
 @Fork(1)
 @State(Scope.Benchmark)
-@Warmup(iterations = 2)
-@Measurement(iterations = 3)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
 @BenchmarkMode(Mode.SingleShotTime)
 public abstract class IcebergSourceBenchmark {
 
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
index 1f81236..9f9a070 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.types.Types;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
 
 public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
 
@@ -41,19 +42,18 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
   @Override
   protected final Table initTable() {
     Schema schema = new Schema(
-            optional(1, "longCol", Types.LongType.get()),
-            optional(2, "intCol", Types.LongType.get()),
-            optional(3, "floatCol", Types.LongType.get()),
-            optional(4, "doubleCol", Types.LongType.get()),
-            optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
-            optional(6, "dateCol", Types.DateType.get()),
-            optional(7, "timestampCol", Types.TimestampType.withZone()),
-            optional(8, "stringCol", Types.StringType.get()));
+        required(1, "longCol", Types.LongType.get()),
+        required(2, "intCol", Types.IntegerType.get()),
+        required(3, "floatCol", Types.FloatType.get()),
+        optional(4, "doubleCol", Types.DoubleType.get()),
+        optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+        optional(6, "dateCol", Types.DateType.get()),
+        optional(7, "timestampCol", Types.TimestampType.withZone()),
+        optional(8, "stringCol", Types.StringType.get()));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
     Map<String, String> properties = Maps.newHashMap();
     properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
-    properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
     return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
index 273d6b1..1c91645 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
@@ -76,52 +76,7 @@ public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFl
     tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
     withTableProperties(tableProperties, () -> {
       String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "10000")
-          .load(tableLocation).filter(FILTER_COND);
-      materialize(df);
-    });
-  }
-
-
-  @Benchmark
-  @Threads(1)
-  public void readWithFilterIceberg100k() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "100000")
-          .load(tableLocation).filter(FILTER_COND);
-      materialize(df);
-    });
-  }
-
-  @Benchmark
-  @Threads(1)
-  public void readWithFilterIceberg10k() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "10000")
-          .load(tableLocation).filter(FILTER_COND);
-      materialize(df);
-    });
-  }
-
-  @Benchmark
-  @Threads(1)
-  public void readWithFilterIceberg5k() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "5000")
-          .load(tableLocation).filter(FILTER_COND);
+      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND);
       materialize(df);
     });
   }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
similarity index 62%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
index 273d6b1..37a6336 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
@@ -37,25 +37,20 @@ import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.expr;
 
 /**
- * A benchmark that evaluates the file skipping capabilities in the Spark data source for Iceberg.
- *
- * This class uses a dataset with a flat schema, where the records are clustered according to the
- * column used in the filter predicate.
- *
- * The performance is compared to the built-in file source in Spark.
+ * A benchmark that evaluates the performance of reading Parquet data with a flat schema
+ * using Iceberg and the built-in file source in Spark.
  *
  * To run this benchmark:
  * <code>
  *   ./gradlew :iceberg-spark:jmh
- *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
- *       -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
+ *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
  * </code>
  */
-public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFlatDataBenchmark {
+public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlatDataBenchmark {
 
-  private static final String FILTER_COND = "dateCol == date_add(current_date(), 1)";
-  private static final int NUM_FILES = 500;
-  private static final int NUM_ROWS = 10000;
+  private static final int NUM_FILES = 10;
+  private static final int NUM_ROWS = 1000000;
 
   @Setup
   public void setupBenchmark() {
@@ -71,87 +66,78 @@ public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFl
 
   @Benchmark
   @Threads(1)
-  public void readWithFilterIceberg() {
+  public void readIceberg() {
     Map<String, String> tableProperties = Maps.newHashMap();
     tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
     withTableProperties(tableProperties, () -> {
       String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "10000")
-          .load(tableLocation).filter(FILTER_COND);
+      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
       materialize(df);
     });
   }
 
-
   @Benchmark
   @Threads(1)
-  public void readWithFilterIceberg100k() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "100000")
-          .load(tableLocation).filter(FILTER_COND);
+  public void readFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
       materialize(df);
     });
   }
 
   @Benchmark
   @Threads(1)
-  public void readWithFilterIceberg10k() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "10000")
-          .load(tableLocation).filter(FILTER_COND);
+  public void readFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
       materialize(df);
     });
   }
 
   @Benchmark
   @Threads(1)
-  public void readWithFilterIceberg5k() {
+  public void readWithProjectionIceberg() {
     Map<String, String> tableProperties = Maps.newHashMap();
     tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
     withTableProperties(tableProperties, () -> {
       String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "5000")
-          .load(tableLocation).filter(FILTER_COND);
+      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).select("longCol");
       materialize(df);
     });
   }
 
   @Benchmark
   @Threads(1)
-  public void readWithFilterFileSourceVectorized() {
+  public void readWithProjectionFileSourceVectorized() {
     Map<String, String> conf = Maps.newHashMap();
     conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
     conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
     withSQLConf(conf, () -> {
-      Dataset<Row> df = spark().read().parquet(dataLocation()).filter(FILTER_COND);
+      Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
       materialize(df);
     });
   }
 
   @Benchmark
   @Threads(1)
-  public void readWithFilterFileSourceNonVectorized() {
+  public void readWithProjectionFileSourceNonVectorized() {
     Map<String, String> conf = Maps.newHashMap();
     conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
     conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
     withSQLConf(conf, () -> {
-      Dataset<Row> df = spark().read().parquet(dataLocation()).filter(FILTER_COND);
+      Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
       materialize(df);
     });
   }
 
   private void appendData() {
-    for (int fileNum = 1; fileNum < NUM_FILES; fileNum++) {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
       Dataset<Row> df = spark().range(NUM_ROWS)
           .withColumnRenamed("id", "longCol")
           .withColumn("intCol", expr("CAST(longCol AS INT)"))
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
index 3a886c1..ab62f53 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
@@ -32,6 +32,8 @@ import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 
+import static org.apache.spark.sql.functions.expr;
+
 /**
  * A benchmark that evaluates the performance of writing Parquet data with a flat schema
  * using Iceberg and the built-in file source in Spark.
@@ -76,13 +78,13 @@ public class IcebergSourceFlatParquetDataWriteBenchmark extends IcebergSourceFla
   private Dataset<Row> benchmarkData() {
     return spark().range(NUM_ROWS)
         .withColumnRenamed("id", "longCol")
-        // .withColumn("intCol", expr("CAST(longCol AS INT)"))
-        // .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
-        // .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
-        // .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
-        // .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))"))
-        // .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
-        // .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+        .withColumn("intCol", expr("CAST(longCol AS INT)"))
+        .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+        .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+        .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+        .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))"))
+        .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+        .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
         .coalesce(1);
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index e8cc083..9a36266 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -118,10 +118,6 @@ public class SparkParquetReaders {
       this.type = type;
     }
 
-    protected MessageType getType() {
-      return type;
-    }
-
     @Override
     public ParquetValueReader<?> message(Types.StructType expected, MessageType message,
                                          List<ParquetValueReader<?>> fieldReaders) {
@@ -364,7 +360,7 @@ public class SparkParquetReaders {
     }
   }
 
-  protected static class StringReader extends PrimitiveReader<UTF8String> {
+  private static class StringReader extends PrimitiveReader<UTF8String> {
     StringReader(ColumnDescriptor desc) {
       super(desc);
     }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index e8b91a8..f97de81 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -85,7 +85,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
 
   @Override
   public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
-      DataSourceOptions options) {
+                                                 DataSourceOptions options) {
     Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite,
         "Save mode %s is not supported", mode);
     Configuration conf = new Configuration(lazyBaseConf());
@@ -100,7 +100,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
 
   @Override
   public StreamWriter createStreamWriter(String runId, StructType dsStruct,
-      OutputMode mode, DataSourceOptions options) {
+                                         OutputMode mode, DataSourceOptions options) {
     Preconditions.checkArgument(
         mode == OutputMode.Append() || mode == OutputMode.Complete(),
         "Output mode %s is not supported", mode);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index fe7811e..ec51e9c 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -101,10 +101,10 @@ class Reader implements DataSourceReader,
     SupportsPushDownFilters,
     SupportsPushDownRequiredColumns,
     SupportsReportStatistics {
+  private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
 
   private static final Filter[] NO_FILTERS = new Filter[0];
 
-  private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
   private final Table table;
   private final Long snapshotId;
   private final Long asOfTimestamp;
@@ -395,7 +395,7 @@ class Reader implements DataSourceReader,
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
       return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo,
-          encryptionManager, caseSensitive);
+        encryptionManager, caseSensitive);
     }
 
     private Schema lazyTableSchema() {
@@ -431,7 +431,7 @@ class Reader implements DataSourceReader,
     private InternalRow current = null;
 
     TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo,
-        EncryptionManager encryptionManager, boolean caseSensitive) {
+                   EncryptionManager encryptionManager, boolean caseSensitive) {
       this.fileIo = fileIo;
       this.tasks = task.files().iterator();
       this.tableSchema = tableSchema;
@@ -585,8 +585,8 @@ class Reader implements DataSourceReader,
     }
 
     private CloseableIterable<InternalRow> newAvroIterable(InputFile location,
-        FileScanTask task,
-        Schema readSchema) {
+                                                      FileScanTask task,
+                                                      Schema readSchema) {
       return Avro.read(location)
           .reuseContainers()
           .project(readSchema)
@@ -596,8 +596,8 @@ class Reader implements DataSourceReader,
     }
 
     private CloseableIterable<InternalRow> newParquetIterable(InputFile location,
-        FileScanTask task,
-        Schema readSchema) {
+                                                            FileScanTask task,
+                                                            Schema readSchema) {
       return Parquet.read(location)
           .project(readSchema)
           .split(task.start(), task.length())
@@ -608,8 +608,8 @@ class Reader implements DataSourceReader,
     }
 
     private CloseableIterable<InternalRow> newOrcIterable(InputFile location,
-        FileScanTask task,
-        Schema readSchema) {
+                                                          FileScanTask task,
+                                                          Schema readSchema) {
       return ORC.read(location)
           .schema(readSchema)
           .split(task.start(), task.length())
@@ -627,7 +627,7 @@ class Reader implements DataSourceReader,
     }
   }
 
-  public static class PartitionRowConverter implements Function<StructLike, InternalRow> {
+  private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
     private final DataType[] types;
     private final int[] positions;
     private final Class<?>[] javaTypes;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 111cbd3..f20bffc 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -257,8 +257,8 @@ class Writer implements DataSourceWriter {
     private final Schema dsSchema;
 
     WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-        Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
-        long targetFileSize, Schema dsSchema) {
+                  Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
+                  long targetFileSize, Schema dsSchema) {
       this.spec = spec;
       this.format = format;
       this.locations = locations;
@@ -372,7 +372,7 @@ class Writer implements DataSourceWriter {
     private long currentRows = 0;
 
     BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
-        WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
+               WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
       this.spec = spec;
       this.format = format;
       this.appenderFactory = appenderFactory;
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index 2ffc81f..5458c1e 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -59,13 +59,13 @@ object SparkTableUtil {
   }
 
   /**
-   * Returns a DataFrame with a row for each partition that matches the specified 'expression'.
-   *
-   * @param spark a Spark session.
-   * @param table name of the table.
-   * @param expression The expression whose matching partitions are returned.
-   * @return a DataFrame of the table partitions.
-   */
+    * Returns a DataFrame with a row for each partition that matches the specified 'expression'.
+    *
+    * @param spark a Spark session.
+    * @param table name of the table.
+    * @param expression The expression whose matching partitions are returned.
+    * @return a DataFrame of the table partitions.
+    */
   def partitionDFByFilter(spark: SparkSession, table: String, expression: String): DataFrame = {
     import spark.implicits._
 
@@ -125,9 +125,9 @@ object SparkTableUtil {
    * @return matching table's partitions
    */
   def getPartitionsByFilter(
-                             spark: SparkSession,
-                             tableIdent: TableIdentifier,
-                             predicateExpr: Expression): Seq[SparkPartition] = {
+      spark: SparkSession,
+      tableIdent: TableIdentifier,
+      predicateExpr: Expression): Seq[SparkPartition] = {
 
     val catalog = spark.sessionState.catalog
     val catalogTable = catalog.getTableMetadata(tableIdent)
@@ -155,9 +155,9 @@ object SparkTableUtil {
    * @return a Seq of [[SparkDataFile]]
    */
   def listPartition(
-                     partition: SparkPartition,
-                     conf: SerializableConfiguration,
-                     metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
+      partition: SparkPartition,
+      conf: SerializableConfiguration,
+      metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
 
     listPartition(partition.values, partition.uri, partition.format, conf.get(), metricsConfig)
   }
@@ -176,11 +176,11 @@ object SparkTableUtil {
    * @return a seq of [[SparkDataFile]]
    */
   def listPartition(
-                     partition: Map[String, String],
-                     uri: String,
-                     format: String,
-                     conf: Configuration = new Configuration(),
-                     metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = {
+      partition: Map[String, String],
+      uri: String,
+      format: String,
+      conf: Configuration = new Configuration(),
+      metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = {
 
     if (format.contains("avro")) {
       listAvroPartition(partition, uri, conf)
@@ -203,18 +203,18 @@ object SparkTableUtil {
    * Case class representing a data file.
    */
   case class SparkDataFile(
-                            path: String,
-                            partition: collection.Map[String, String],
-                            format: String,
-                            fileSize: Long,
-                            rowGroupSize: Long,
-                            rowCount: Long,
-                            columnSizes: Array[Long],
-                            valueCounts: Array[Long],
-                            nullValueCounts: Array[Long],
-                            lowerBounds: Seq[Array[Byte]],
-                            upperBounds: Seq[Array[Byte]]
-                          ) {
+      path: String,
+      partition: collection.Map[String, String],
+      format: String,
+      fileSize: Long,
+      rowGroupSize: Long,
+      rowCount: Long,
+      columnSizes: Array[Long],
+      valueCounts: Array[Long],
+      nullValueCounts: Array[Long],
+      lowerBounds: Seq[Array[Byte]],
+      upperBounds: Seq[Array[Byte]]
+    ) {
 
     /**
      * Convert this to a [[DataFile]] that can be added to a [[org.apache.iceberg.Table]].
@@ -255,7 +255,7 @@ object SparkTableUtil {
         val copy = if (buffer.hasArray) {
           val bytes = buffer.array()
           if (buffer.arrayOffset() == 0 && buffer.position() == 0 &&
-            bytes.length == buffer.remaining()) {
+              bytes.length == buffer.remaining()) {
             bytes
           } else {
             val start = buffer.arrayOffset() + buffer.position()
@@ -326,9 +326,9 @@ object SparkTableUtil {
   }
 
   private def listAvroPartition(
-                                 partitionPath: Map[String, String],
-                                 partitionUri: String,
-                                 conf: Configuration): Seq[SparkDataFile] = {
+      partitionPath: Map[String, String],
+      partitionUri: String,
+      conf: Configuration): Seq[SparkDataFile] = {
     val partition = new Path(partitionUri)
     val fs = partition.getFileSystem(conf)
 
@@ -348,10 +348,10 @@ object SparkTableUtil {
 
   //noinspection ScalaDeprecation
   private def listParquetPartition(
-                                    partitionPath: Map[String, String],
-                                    partitionUri: String,
-                                    conf: Configuration,
-                                    metricsSpec: MetricsConfig): Seq[SparkDataFile] = {
+      partitionPath: Map[String, String],
+      partitionUri: String,
+      conf: Configuration,
+      metricsSpec: MetricsConfig): Seq[SparkDataFile] = {
     val partition = new Path(partitionUri)
     val fs = partition.getFileSystem(conf)
 
@@ -372,9 +372,9 @@ object SparkTableUtil {
   }
 
   private def listOrcPartition(
-                                partitionPath: Map[String, String],
-                                partitionUri: String,
-                                conf: Configuration): Seq[SparkDataFile] = {
+      partitionPath: Map[String, String],
+      partitionUri: String,
+      conf: Configuration): Seq[SparkDataFile] = {
     val partition = new Path(partitionUri)
     val fs = partition.getFileSystem(conf)
 
@@ -418,9 +418,9 @@ object SparkTableUtil {
   }
 
   private def buildManifest(
-                             conf: SerializableConfiguration,
-                             spec: PartitionSpec,
-                             basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = { files =>
+      conf: SerializableConfiguration,
+      spec: PartitionSpec,
+      basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = { files =>
     if (files.hasNext) {
       val io = new HadoopFileIO(conf.get())
       val ctx = TaskContext.get()
@@ -477,10 +477,10 @@ object SparkTableUtil {
    * @param stagingDir a staging directory to store temporary manifest files
    */
   def importSparkTable(
-                        spark: SparkSession,
-                        sourceTableIdent: TableIdentifier,
-                        targetTable: Table,
-                        stagingDir: String): Unit = {
+      spark: SparkSession,
+      sourceTableIdent: TableIdentifier,
+      targetTable: Table,
+      stagingDir: String): Unit = {
 
     val catalog = spark.sessionState.catalog
 
@@ -502,9 +502,9 @@ object SparkTableUtil {
   }
 
   private def importUnpartitionedSparkTable(
-                                             spark: SparkSession,
-                                             sourceTableIdent: TableIdentifier,
-                                             targetTable: Table): Unit = {
+      spark: SparkSession,
+      sourceTableIdent: TableIdentifier,
+      targetTable: Table): Unit = {
 
     val sourceTable = spark.sessionState.catalog.getTableMetadata(sourceTableIdent)
     val format = sourceTable.storage.serde.orElse(sourceTable.provider)
@@ -530,11 +530,11 @@ object SparkTableUtil {
    * @param stagingDir a staging directory to store temporary manifest files
    */
   def importSparkPartitions(
-                             spark: SparkSession,
-                             partitions: Seq[SparkPartition],
-                             targetTable: Table,
-                             spec: PartitionSpec,
-                             stagingDir: String): Unit = {
+      spark: SparkSession,
+      partitions: Seq[SparkPartition],
+      targetTable: Table,
+      spec: PartitionSpec,
+      stagingDir: String): Unit = {
 
     import spark.implicits._
 
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
index 3c9d7b6..7235807 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
@@ -55,9 +55,7 @@ public class RandomData {
     RandomDataGenerator generator = new RandomDataGenerator(schema, seed);
     List<Record> records = Lists.newArrayListWithExpectedSize(numRecords);
     for (int i = 0; i < numRecords; i += 1) {
-      Record rec = (Record) TypeUtil.visit(schema, generator);
-      //System.out.println("Add record "+rec);
-      records.add(rec);
+      records.add((Record) TypeUtil.visit(schema, generator));
     }
 
     return records;
@@ -469,6 +467,7 @@ public class RandomData {
     for (int i = 0; i < length; i += 1) {
       buffer[i] = (byte) CHARS.charAt(random.nextInt(CHARS.length()));
     }
+
     return UTF8String.fromBytes(buffer);
   }
 
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 03d093c..da182b9 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -68,8 +68,7 @@ import static scala.collection.JavaConverters.seqAsJavaListConverter;
 @SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
 public class TestHelpers {
 
-  private TestHelpers() {
-  }
+  private TestHelpers() {}
 
   public static void assertEqualsSafe(Types.StructType struct, Record rec, Row row) {
     List<Types.NestedField> fields = struct.fields();
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
index bafbec4..1466dea 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
@@ -104,7 +104,7 @@ public class TestParquetAvroReader {
           .project(structSchema)
           .createReaderFunc(
               fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema))
-          .build()) {
+           .build()) {
         long start = System.currentTimeMillis();
         long val = 0;
         long count = 0;
@@ -164,10 +164,10 @@ public class TestParquetAvroReader {
       System.gc();
 
       try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
-          .project(COMPLEX_SCHEMA)
-          .createReaderFunc(
-              fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
-          .build()) {
+           .project(COMPLEX_SCHEMA)
+           .createReaderFunc(
+               fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+           .build()) {
         long start = System.currentTimeMillis();
         long val = 0;
         long count = 0;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
index ba4f53a..0e97c37 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
@@ -19,9 +19,21 @@
 
 package org.apache.iceberg.spark.data;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetAvroValueReaders;
+import org.apache.iceberg.parquet.ParquetAvroWriter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -62,34 +74,34 @@ public class TestParquetAvroWriter {
 
   @Test
   public void testCorrectness() throws IOException {
-    // Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139);
-    //
-    // File testFile = temp.newFile();
-    // Assert.assertTrue("Delete should succeed", testFile.delete());
-    //
-    // try (FileAppender<Record> writer = Parquet.write(Files.localOutput(testFile))
-    //     .schema(COMPLEX_SCHEMA)
-    //     .createWriterFunc(ParquetAvroWriter::buildWriter)
-    //     .build()) {
-    //   writer.addAll(records);
-    // }
-    //
-    // // RandomData uses the root record name "test", which must match for records to be equal
-    // MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
-    //
-    // // verify that the new read path is correct
-    // try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
-    //     .project(COMPLEX_SCHEMA)
-    //     .createReaderFunc(
-    //         fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
-    //     .build()) {
-    //   int recordNum = 0;
-    //   Iterator<Record> iter = records.iterator();
-    //   for (Record actual : reader) {
-    //     Record expected = iter.next();
-    //     Assert.assertEquals("Record " + recordNum + " should match expected", expected, actual);
-    //     recordNum += 1;
-    //   }
-    // }
+    Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139);
+
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<Record> writer = Parquet.write(Files.localOutput(testFile))
+        .schema(COMPLEX_SCHEMA)
+        .createWriterFunc(ParquetAvroWriter::buildWriter)
+        .build()) {
+      writer.addAll(records);
+    }
+
+    // RandomData uses the root record name "test", which must match for records to be equal
+    MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
+
+    // verify that the new read path is correct
+    try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+        .project(COMPLEX_SCHEMA)
+        .createReaderFunc(
+            fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+        .build()) {
+      int recordNum = 0;
+      Iterator<Record> iter = records.iterator();
+      for (Record actual : reader) {
+        Record expected = iter.next();
+        Assert.assertEquals("Record " + recordNum + " should match expected", expected, actual);
+        recordNum += 1;
+      }
+    }
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
index ace908e..5e22cce 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
@@ -19,37 +19,52 @@
 
 package org.apache.iceberg.spark.data;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
+import org.junit.Assume;
+
+import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
 
 public class TestSparkParquetReader extends AvroDataTest {
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
-    // Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
-    //     type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
-    //
-    // List<GenericData.Record> expected = RandomData.generateList(schema, 100, 0L);
-    //
-    // File testFile = temp.newFile();
-    // Assert.assertTrue("Delete should succeed", testFile.delete());
-    //
-    // try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(testFile))
-    //     .schema(schema)
-    //     .named("test")
-    //     .build()) {
-    //   writer.addAll(expected);
-    // }
-    //
-    // try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
-    //     .project(schema)
-    //     .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
-    //     .build()) {
-    //   Iterator<InternalRow> rows = reader.iterator();
-    //   for (int i = 0; i < expected.size(); i += 1) {
-    //     Assert.assertTrue("Should have expected number of rows", rows.hasNext());
-    //     assertEqualsUnsafe(schema.asStruct(), expected.get(i), rows.next());
-    //   }
-    //   Assert.assertFalse("Should not have extra rows", rows.hasNext());
-    // }
+    Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
+        type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
+
+    List<GenericData.Record> expected = RandomData.generateList(schema, 100, 0L);
+
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(testFile))
+        .schema(schema)
+        .named("test")
+        .build()) {
+      writer.addAll(expected);
+    }
+
+    try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
+        .project(schema)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+        .build()) {
+      Iterator<InternalRow> rows = reader.iterator();
+      for (int i = 0; i < expected.size(); i += 1) {
+        Assert.assertTrue("Should have expected number of rows", rows.hasNext());
+        assertEqualsUnsafe(schema.asStruct(), expected.get(i), rows.next());
+      }
+      Assert.assertFalse("Should not have extra rows", rows.hasNext());
+    }
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
index 5688596..00f95f3 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
@@ -19,9 +19,19 @@
 
 package org.apache.iceberg.spark.data;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
 import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -58,32 +68,32 @@ public class TestSparkParquetWriter {
       optional(2, "slide", Types.StringType.get())
   );
 
-  // @Test
-  // public void testCorrectness() throws IOException {
-  //   int numRows = 250_000;
-  //   Iterable<InternalRow> records = RandomData.generateSpark(COMPLEX_SCHEMA, numRows, 19981);
-  //
-  //   File testFile = temp.newFile();
-  //   Assert.assertTrue("Delete should succeed", testFile.delete());
-  //
-  //   try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(testFile))
-  //       .schema(COMPLEX_SCHEMA)
-  //       .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(COMPLEX_SCHEMA, msgType))
-  //       .build()) {
-  //     writer.addAll(records);
-  //   }
-  //
-  //   try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
-  //       .project(COMPLEX_SCHEMA)
-  //       .createReaderFunc(type -> SparkParquetReaders.buildReader(COMPLEX_SCHEMA, type))
-  //       .build()) {
-  //     Iterator<InternalRow> expected = records.iterator();
-  //     Iterator<InternalRow> rows = reader.iterator();
-  //     for (int i = 0; i < numRows; i += 1) {
-  //       Assert.assertTrue("Should have expected number of rows", rows.hasNext());
-  //       TestHelpers.assertEquals(COMPLEX_SCHEMA, expected.next(), rows.next());
-  //     }
-  //     Assert.assertFalse("Should not have extra rows", rows.hasNext());
-  //   }
-  // }
+  @Test
+  public void testCorrectness() throws IOException {
+    int numRows = 250_000;
+    Iterable<InternalRow> records = RandomData.generateSpark(COMPLEX_SCHEMA, numRows, 19981);
+
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(testFile))
+        .schema(COMPLEX_SCHEMA)
+        .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(COMPLEX_SCHEMA, msgType))
+        .build()) {
+      writer.addAll(records);
+    }
+
+    try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
+        .project(COMPLEX_SCHEMA)
+        .createReaderFunc(type -> SparkParquetReaders.buildReader(COMPLEX_SCHEMA, type))
+        .build()) {
+      Iterator<InternalRow> expected = records.iterator();
+      Iterator<InternalRow> rows = reader.iterator();
+      for (int i = 0; i < numRows; i += 1) {
+        Assert.assertTrue("Should have expected number of rows", rows.hasNext());
+        TestHelpers.assertEquals(COMPLEX_SCHEMA, expected.next(), rows.next());
+      }
+      Assert.assertFalse("Should not have extra rows", rows.hasNext());
+    }
+  }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
index 13d5a21..c935bfc 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -67,11 +67,11 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
 
     // Create a spark session.
     TestSparkTableUtil.spark = SparkSession.builder().master("local[2]")
-        .enableHiveSupport()
-        .config("spark.hadoop.hive.metastore.uris", metastoreURI)
-        .config("hive.exec.dynamic.partition", "true")
-        .config("hive.exec.dynamic.partition.mode", "nonstrict")
-        .getOrCreate();
+            .enableHiveSupport()
+            .config("spark.hadoop.hive.metastore.uris", metastoreURI)
+            .config("hive.exec.dynamic.partition", "true")
+            .config("hive.exec.dynamic.partition.mode", "nonstrict")
+            .getOrCreate();
   }
 
   @AfterClass
@@ -89,24 +89,24 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
     SQLContext sc = new SQLContext(TestSparkTableUtil.spark);
 
     sc.sql(String.format(
-        "CREATE TABLE %s (\n" +
-            "    id int COMMENT 'unique id'\n" +
-            ")\n" +
-            " PARTITIONED BY (data string)\n" +
-            " LOCATION '%s'", qualifiedTableName, tableLocationStr)
+                    "CREATE TABLE %s (\n" +
+                    "    id int COMMENT 'unique id'\n" +
+                    ")\n" +
+                    " PARTITIONED BY (data string)\n" +
+                    " LOCATION '%s'", qualifiedTableName, tableLocationStr)
     );
 
     List<SimpleRecord> expected = Lists.newArrayList(
-        new SimpleRecord(1, "a"),
-        new SimpleRecord(2, "b"),
-        new SimpleRecord(3, "c")
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c")
     );
 
     Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
 
     df.select("id", "data").orderBy("data").write()
-        .mode("append")
-        .insertInto(qualifiedTableName);
+            .mode("append")
+            .insertInto(qualifiedTableName);
   }
 
   @After
@@ -141,14 +141,14 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
   public void testImportPartitionedTable() throws Exception {
     File location = temp.newFolder("partitioned_table");
     spark.table(qualifiedTableName).write().mode("overwrite").partitionBy("data").format("parquet")
-        .saveAsTable("test_partitioned_table");
+            .saveAsTable("test_partitioned_table");
     TableIdentifier source = spark.sessionState().sqlParser()
-        .parseTableIdentifier("test_partitioned_table");
+            .parseTableIdentifier("test_partitioned_table");
     HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
     Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, qualifiedTableName),
-        SparkSchemaUtil.specForTable(spark, qualifiedTableName),
-        ImmutableMap.of(),
-        location.getCanonicalPath());
+            SparkSchemaUtil.specForTable(spark, qualifiedTableName),
+            ImmutableMap.of(),
+            location.getCanonicalPath());
     File stagingDir = temp.newFolder("staging-dir");
     SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
     long count = spark.read().format("iceberg").load(location.toString()).count();
@@ -159,14 +159,14 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
   public void testImportUnpartitionedTable() throws Exception {
     File location = temp.newFolder("unpartitioned_table");
     spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
-        .saveAsTable("test_unpartitioned_table");
+            .saveAsTable("test_unpartitioned_table");
     TableIdentifier source = spark.sessionState().sqlParser()
-        .parseTableIdentifier("test_unpartitioned_table");
+            .parseTableIdentifier("test_unpartitioned_table");
     HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
     Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, qualifiedTableName),
-        SparkSchemaUtil.specForTable(spark, qualifiedTableName),
-        ImmutableMap.of(),
-        location.getCanonicalPath());
+            SparkSchemaUtil.specForTable(spark, qualifiedTableName),
+            ImmutableMap.of(),
+            location.getCanonicalPath());
     File stagingDir = temp.newFolder("staging-dir");
     SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
     long count = spark.read().format("iceberg").load(location.toString()).count();
@@ -176,24 +176,24 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
   @Test
   public void testImportAsHiveTable() throws Exception {
     spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
-        .saveAsTable("unpartitioned_table");
+            .saveAsTable("unpartitioned_table");
     TableIdentifier source = new TableIdentifier("unpartitioned_table");
     Table table = catalog.createTable(
-        org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_unpartitioned_table"),
-        SparkSchemaUtil.schemaForTable(spark, "unpartitioned_table"),
-        SparkSchemaUtil.specForTable(spark, "unpartitioned_table"));
+            org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_unpartitioned_table"),
+            SparkSchemaUtil.schemaForTable(spark, "unpartitioned_table"),
+            SparkSchemaUtil.specForTable(spark, "unpartitioned_table"));
     File stagingDir = temp.newFolder("staging-dir");
     SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
     long count1 = spark.read().format("iceberg").load(DB_NAME + ".test_unpartitioned_table").count();
     Assert.assertEquals("three values ", 3, count1);
 
     spark.table(qualifiedTableName).write().mode("overwrite").partitionBy("data").format("parquet")
-        .saveAsTable("partitioned_table");
+            .saveAsTable("partitioned_table");
     source = new TableIdentifier("partitioned_table");
     table = catalog.createTable(
-        org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_partitioned_table"),
-        SparkSchemaUtil.schemaForTable(spark, "partitioned_table"),
-        SparkSchemaUtil.specForTable(spark, "partitioned_table"));
+            org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_partitioned_table"),
+            SparkSchemaUtil.schemaForTable(spark, "partitioned_table"),
+            SparkSchemaUtil.specForTable(spark, "partitioned_table"));
 
     SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
     long count2 = spark.read().format("iceberg").load(DB_NAME + ".test_partitioned_table").count();