You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/15 22:16:27 UTC

[iceberg] branch master updated: Spark: Support vectorized Parquet reads for flat projections (#828)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ffdcf09  Spark: Support vectorized Parquet reads for flat projections (#828)
ffdcf09 is described below

commit ffdcf09027e09460b7d7505e65aea119107934a3
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Mon Jun 15 15:16:19 2020 -0700

    Spark: Support vectorized Parquet reads for flat projections (#828)
---
 .../org/apache/iceberg/arrow/ArrowAllocation.java  |  37 ++
 .../arrow/vectorized/IcebergArrowVectors.java      |  33 +-
 .../iceberg/arrow/vectorized/VectorHolder.java     |  53 ++-
 .../arrow/vectorized/VectorizedArrowReader.java    | 131 +++---
 .../parquet/VectorizedColumnIterator.java          |  18 +-
 ...orizedDictionaryEncodedParquetValuesReader.java | 141 +++---
 .../vectorized/parquet/VectorizedPageIterator.java |  39 +-
 .../VectorizedParquetDefinitionLevelReader.java    |  80 ++--
 build.gradle                                       |  24 +
 .../java/org/apache/iceberg/TableProperties.java   |   6 +
 .../org/apache/iceberg/parquet/ParquetUtil.java    |   1 +
 .../iceberg/parquet/VectorizedParquetReader.java   |  17 +-
 .../apache/iceberg/parquet/VectorizedReader.java   |  16 +-
 .../spark/source/IcebergSourceBenchmark.java       |  20 +-
 .../spark/data/vectorized/ArrowVectorAccessor.java |  95 ++++
 .../data/vectorized/ArrowVectorAccessors.java      | 508 +++++++++++++++++++++
 .../spark/data/vectorized/ColumnarBatchReader.java | 110 +++++
 .../data/vectorized/IcebergArrowColumnVector.java  | 153 +++++++
 .../data/vectorized/NullValuesColumnVector.java    | 125 +++++
 .../vectorized/VectorizedSparkParquetReaders.java  | 131 ++++++
 .../iceberg/spark/source/BatchDataReader.java      |  76 +++
 .../org/apache/iceberg/spark/source/Reader.java    | 128 +++++-
 .../apache/iceberg/spark/data/AvroDataTest.java    |  18 +
 .../org/apache/iceberg/spark/data/RandomData.java  | 124 ++++-
 .../org/apache/iceberg/spark/data/TestHelpers.java |  29 +-
 .../iceberg/spark/data/TestParquetAvroReader.java  |   2 +-
 .../iceberg/spark/data/TestParquetAvroWriter.java  |   2 +-
 .../iceberg/spark/data/TestSparkParquetWriter.java |   2 +-
 ...estParquetDictionaryEncodedVectorizedReads.java |  42 ++
 ...naryFallbackToPlainEncodingVectorizedReads.java |  70 +++
 .../vectorized/TestParquetVectorizedReads.java     | 196 ++++++++
 .../iceberg/spark/source/TestParquetScan.java      |  25 +-
 .../spark/source/TestSparkReadProjection.java      |  23 +-
 33 files changed, 2169 insertions(+), 306 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/ArrowAllocation.java b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowAllocation.java
new file mode 100644
index 0000000..49882ce
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowAllocation.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow;
+
+import org.apache.arrow.memory.RootAllocator;
+
+public class ArrowAllocation {
+  static {
+    ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  private static final RootAllocator ROOT_ALLOCATOR;
+
+  private ArrowAllocation() {
+  }
+
+  public static RootAllocator rootAllocator() {
+    return ROOT_ALLOCATOR;
+  }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/IcebergArrowVectors.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/IcebergArrowVectors.java
index d6fa260..a82fa57 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/IcebergArrowVectors.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/IcebergArrowVectors.java
@@ -21,8 +21,8 @@ package org.apache.iceberg.arrow.vectorized;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
 
 /**
  * The general way of getting a value at an index in the Arrow vector
@@ -65,37 +65,8 @@ public class IcebergArrowVectors {
   }
 
   /**
-   * Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having this implementation is to override the
-   * expensive {@link VarBinaryVector#isSet(int)} method.
-   */
-  public static class VarBinaryArrowVector extends VarBinaryVector {
-    private NullabilityHolder nullabilityHolder;
-
-    public VarBinaryArrowVector(
-        String name,
-        BufferAllocator allocator) {
-      super(name, allocator);
-    }
-
-    /**
-     * Same as {@link #isNull(int)}.
-     *
-     * @param index position of element
-     * @return 1 if element at given index is not null, 0 otherwise
-     */
-    @Override
-    public int isSet(int index) {
-      return nullabilityHolder.isNullAt(index) ^ 1;
-    }
-
-    public void setNullabilityHolder(NullabilityHolder nullabilityHolder) {
-      this.nullabilityHolder = nullabilityHolder;
-    }
-  }
-
-  /**
    * Extension of Arrow's @{@link VarCharVector}. The reason of having this implementation is to override the expensive
-   * {@link VarCharVector#isSet(int)} method.
+   * {@link VarCharVector#isSet(int)} method called by {@link VarCharVector#get(int, NullableVarCharHolder)}
    */
   public static class VarcharArrowVector extends VarCharVector {
 
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
index d59292f..b938d38 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
@@ -19,35 +19,48 @@
 
 package org.apache.iceberg.arrow.vectorized;
 
+import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.types.Type;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
 
 /**
- * Container class for holding the Arrow vector holding a batch of values along with other state needed for reading
+ * Container class for holding the Arrow vector storing a batch of values along with other state needed for reading
  * values out of it.
  */
 public class VectorHolder {
   private final ColumnDescriptor columnDescriptor;
   private final FieldVector vector;
   private final boolean isDictionaryEncoded;
-
   private final Dictionary dictionary;
   private final NullabilityHolder nullabilityHolder;
-
-  public static final VectorHolder NULL_VECTOR_HOLDER = new VectorHolder(null, null, false, null, null);
+  private final Type icebergType;
 
   public VectorHolder(
-      ColumnDescriptor columnDescriptor,
-      FieldVector vector,
-      boolean isDictionaryEncoded,
-      Dictionary dictionary,
-      NullabilityHolder holder) {
+      ColumnDescriptor columnDescriptor, FieldVector vector, boolean isDictionaryEncoded,
+      Dictionary dictionary, NullabilityHolder holder, Type type) {
+    // All the fields except dictionary are not nullable unless it is a dummy holder
+    Preconditions.checkNotNull(columnDescriptor, "ColumnDescriptor cannot be null");
+    Preconditions.checkNotNull(vector, "Vector cannot be null");
+    Preconditions.checkNotNull(holder, "NullabilityHolder cannot be null");
+    Preconditions.checkNotNull(type, "IcebergType cannot be null");
     this.columnDescriptor = columnDescriptor;
     this.vector = vector;
     this.isDictionaryEncoded = isDictionaryEncoded;
     this.dictionary = dictionary;
     this.nullabilityHolder = holder;
+    this.icebergType = type;
+  }
+
+  // Only used for returning dummy holder
+  private VectorHolder() {
+    columnDescriptor = null;
+    vector = null;
+    isDictionaryEncoded = false;
+    dictionary = null;
+    nullabilityHolder = null;
+    icebergType = null;
   }
 
   public ColumnDescriptor descriptor() {
@@ -69,4 +82,26 @@ public class VectorHolder {
   public NullabilityHolder nullabilityHolder() {
     return nullabilityHolder;
   }
+
+  public Type icebergType() {
+    return icebergType;
+  }
+
+  public int numValues() {
+    return vector.getValueCount();
+  }
+
+  public static VectorHolder dummyHolder(int numRows) {
+    return new VectorHolder() {
+      @Override
+      public int numValues() {
+        return numRows;
+      }
+    };
+  }
+
+  public boolean isDummy() {
+    return vector == null;
+  }
+
 }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index cbe3eac..dbde001 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -48,9 +48,8 @@ import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.PrimitiveType;
 
 /**
- * {@link VectorizedReader VectorReader(s)} that read in a batch of values into Arrow vectors.
- * It also takes care of allocating the right kind of Arrow vectors depending on the corresponding
- * Iceberg/Parquet data types.
+ * {@link VectorizedReader VectorReader(s)} that read in a batch of values into Arrow vectors. It also takes care of
+ * allocating the right kind of Arrow vectors depending on the corresponding Iceberg/Parquet data types.
  */
 public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
   public static final int DEFAULT_BATCH_SIZE = 5000;
@@ -58,33 +57,30 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
   private static final int AVERAGE_VARIABLE_WIDTH_RECORD_SIZE = 10;
 
   private final ColumnDescriptor columnDescriptor;
-  private final int batchSize;
   private final VectorizedColumnIterator vectorizedColumnIterator;
   private final Types.NestedField icebergField;
   private final BufferAllocator rootAlloc;
+
+  private int batchSize;
   private FieldVector vec;
   private Integer typeWidth;
   private ReadType readType;
-  private boolean reuseContainers = true;
   private NullabilityHolder nullabilityHolder;
 
   // In cases when Parquet employs fall back to plain encoding, we eagerly decode the dictionary encoded pages
   // before storing the values in the Arrow vector. This means even if the dictionary is present, data
   // present in the vector may not necessarily be dictionary encoded.
   private Dictionary dictionary;
-  private boolean allPagesDictEncoded;
 
   public VectorizedArrowReader(
       ColumnDescriptor desc,
       Types.NestedField icebergField,
       BufferAllocator ra,
-      int batchSize,
       boolean setArrowValidityVector) {
     this.icebergField = icebergField;
-    this.batchSize = (batchSize == 0) ? DEFAULT_BATCH_SIZE : batchSize;
     this.columnDescriptor = desc;
     this.rootAlloc = ra;
-    this.vectorizedColumnIterator = new VectorizedColumnIterator(desc, "", batchSize, setArrowValidityVector);
+    this.vectorizedColumnIterator = new VectorizedColumnIterator(desc, "", setArrowValidityVector);
   }
 
   private VectorizedArrowReader() {
@@ -96,21 +92,37 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
   }
 
   private enum ReadType {
-    FIXED_LENGTH_DECIMAL, INT_LONG_BACKED_DECIMAL, VARCHAR, VARBINARY, FIXED_WIDTH_BINARY,
-    BOOLEAN, INT, LONG, FLOAT, DOUBLE, TIMESTAMP_MILLIS
+    FIXED_LENGTH_DECIMAL,
+    INT_LONG_BACKED_DECIMAL,
+    VARCHAR,
+    VARBINARY,
+    FIXED_WIDTH_BINARY,
+    BOOLEAN,
+    INT,
+    LONG,
+    FLOAT,
+    DOUBLE,
+    TIMESTAMP_MILLIS
+  }
+
+  @Override
+  public void setBatchSize(int batchSize) {
+    this.batchSize = (batchSize == 0) ? DEFAULT_BATCH_SIZE : batchSize;
+    this.vectorizedColumnIterator.setBatchSize(batchSize);
   }
 
   @Override
-  public VectorHolder read(int numValsToRead) {
-    if (vec == null || !reuseContainers) {
-      allocateFieldVector();
+  public VectorHolder read(VectorHolder reuse, int numValsToRead) {
+    if (reuse == null) {
+      allocateFieldVector(this.vectorizedColumnIterator.producesDictionaryEncodedVector());
       nullabilityHolder = new NullabilityHolder(batchSize);
     } else {
       vec.setValueCount(0);
       nullabilityHolder.reset();
     }
+    boolean dictEncoded = vectorizedColumnIterator.producesDictionaryEncodedVector();
     if (vectorizedColumnIterator.hasNext()) {
-      if (allPagesDictEncoded) {
+      if (dictEncoded) {
         vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
       } else {
         switch (readType) {
@@ -123,7 +135,6 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
             vectorizedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
             break;
           case VARBINARY:
-            ((IcebergArrowVectors.VarBinaryArrowVector) vec).setNullabilityHolder(nullabilityHolder);
             vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
             break;
           case VARCHAR:
@@ -131,7 +142,6 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
             vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
             break;
           case FIXED_WIDTH_BINARY:
-            ((IcebergArrowVectors.VarBinaryArrowVector) vec).setNullabilityHolder(nullabilityHolder);
             vectorizedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth, nullabilityHolder);
             break;
           case BOOLEAN:
@@ -157,11 +167,12 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
     }
     Preconditions.checkState(vec.getValueCount() == numValsToRead,
         "Number of values read, %s, does not equal expected, %s", vec.getValueCount(), numValsToRead);
-    return new VectorHolder(columnDescriptor, vec, allPagesDictEncoded, dictionary, nullabilityHolder);
+    return new VectorHolder(columnDescriptor, vec, dictEncoded, dictionary,
+        nullabilityHolder, icebergField.type());
   }
 
-  private void allocateFieldVector() {
-    if (allPagesDictEncoded) {
+  private void allocateFieldVector(boolean dictionaryEncodedVector) {
+    if (dictionaryEncodedVector) {
       Field field = new Field(
           icebergField.name(),
           new FieldType(icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null),
@@ -182,7 +193,7 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
             //TODO: Possibly use the uncompressed page size info to set the initial capacity
             vec.setInitialCapacity(batchSize * AVERAGE_VARIABLE_WIDTH_RECORD_SIZE);
             vec.allocateNewSafe();
-            this.readType =  ReadType.VARCHAR;
+            this.readType = ReadType.VARCHAR;
             this.typeWidth = UNKNOWN_WIDTH;
             break;
           case INT_8:
@@ -190,31 +201,31 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
           case INT_32:
             this.vec = arrowField.createVector(rootAlloc);
             ((IntVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.INT;
+            this.readType = ReadType.INT;
             this.typeWidth = (int) IntVector.TYPE_WIDTH;
             break;
           case DATE:
             this.vec = arrowField.createVector(rootAlloc);
             ((DateDayVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.INT;
+            this.readType = ReadType.INT;
             this.typeWidth = (int) IntVector.TYPE_WIDTH;
             break;
           case INT_64:
             this.vec = arrowField.createVector(rootAlloc);
             ((BigIntVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.LONG;
+            this.readType = ReadType.LONG;
             this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
             break;
           case TIMESTAMP_MILLIS:
             this.vec = arrowField.createVector(rootAlloc);
             ((BigIntVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.TIMESTAMP_MILLIS;
+            this.readType = ReadType.TIMESTAMP_MILLIS;
             this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
             break;
           case TIMESTAMP_MICROS:
             this.vec = arrowField.createVector(rootAlloc);
             ((TimeStampMicroTZVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.LONG;
+            this.readType = ReadType.LONG;
             this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
             break;
           case DECIMAL:
@@ -225,15 +236,15 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
             switch (primitive.getPrimitiveTypeName()) {
               case BINARY:
               case FIXED_LEN_BYTE_ARRAY:
-                this.readType =  ReadType.FIXED_LENGTH_DECIMAL;
+                this.readType = ReadType.FIXED_LENGTH_DECIMAL;
                 this.typeWidth = primitive.getTypeLength();
                 break;
               case INT64:
-                this.readType =  ReadType.INT_LONG_BACKED_DECIMAL;
+                this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
                 this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
                 break;
               case INT32:
-                this.readType =  ReadType.INT_LONG_BACKED_DECIMAL;
+                this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
                 this.typeWidth = (int) IntVector.TYPE_WIDTH;
                 break;
               default:
@@ -249,48 +260,48 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
         switch (primitive.getPrimitiveTypeName()) {
           case FIXED_LEN_BYTE_ARRAY:
             int len = ((Types.FixedType) icebergField.type()).length();
-            this.vec = new IcebergArrowVectors.VarBinaryArrowVector(icebergField.name(), rootAlloc);
+            this.vec = arrowField.createVector(rootAlloc);
             vec.setInitialCapacity(batchSize * len);
             vec.allocateNew();
-            this.readType =  ReadType.FIXED_WIDTH_BINARY;
+            this.readType = ReadType.FIXED_WIDTH_BINARY;
             this.typeWidth = len;
             break;
           case BINARY:
-            this.vec = new IcebergArrowVectors.VarBinaryArrowVector(icebergField.name(), rootAlloc);
+            this.vec = arrowField.createVector(rootAlloc);
             //TODO: Possibly use the uncompressed page size info to set the initial capacity
             vec.setInitialCapacity(batchSize * AVERAGE_VARIABLE_WIDTH_RECORD_SIZE);
             vec.allocateNewSafe();
-            this.readType =  ReadType.VARBINARY;
+            this.readType = ReadType.VARBINARY;
             this.typeWidth = UNKNOWN_WIDTH;
             break;
           case INT32:
             this.vec = arrowField.createVector(rootAlloc);
             ((IntVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.INT;
+            this.readType = ReadType.INT;
             this.typeWidth = (int) IntVector.TYPE_WIDTH;
             break;
           case FLOAT:
             this.vec = arrowField.createVector(rootAlloc);
             ((Float4Vector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.FLOAT;
+            this.readType = ReadType.FLOAT;
             this.typeWidth = (int) Float4Vector.TYPE_WIDTH;
             break;
           case BOOLEAN:
             this.vec = arrowField.createVector(rootAlloc);
             ((BitVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.BOOLEAN;
+            this.readType = ReadType.BOOLEAN;
             this.typeWidth = UNKNOWN_WIDTH;
             break;
           case INT64:
             this.vec = arrowField.createVector(rootAlloc);
             ((BigIntVector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.LONG;
+            this.readType = ReadType.LONG;
             this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
             break;
           case DOUBLE:
             this.vec = arrowField.createVector(rootAlloc);
             ((Float8Vector) vec).allocateNew(batchSize);
-            this.readType =  ReadType.DOUBLE;
+            this.readType = ReadType.DOUBLE;
             this.typeWidth = (int) Float8Vector.TYPE_WIDTH;
             break;
           default:
@@ -303,13 +314,9 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
   @Override
   public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
     ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath()));
-    allPagesDictEncoded = !ParquetUtil.hasNonDictionaryPages(chunkMetaData);
-    dictionary = vectorizedColumnIterator.setRowGroupInfo(source.getPageReader(columnDescriptor), allPagesDictEncoded);
-  }
-
-  @Override
-  public void reuseContainers(boolean reuse) {
-    this.reuseContainers = reuse;
+    this.dictionary = vectorizedColumnIterator.setRowGroupInfo(
+        source.getPageReader(columnDescriptor),
+        !ParquetUtil.hasNonDictionaryPages(chunkMetaData));
   }
 
   @Override
@@ -324,16 +331,30 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
     return columnDescriptor.toString();
   }
 
-  public static final VectorizedArrowReader NULL_VALUES_READER =
-      new VectorizedArrowReader() {
-        @Override
-        public VectorHolder read(int numValsToRead) {
-          return VectorHolder.NULL_VECTOR_HOLDER;
-        }
+  public static VectorizedArrowReader nulls() {
+    return NullVectorReader.INSTANCE;
+  }
+
+  private static final class NullVectorReader extends VectorizedArrowReader {
+    private static final NullVectorReader INSTANCE = new NullVectorReader();
+
+    @Override
+    public VectorHolder read(VectorHolder reuse, int numValsToRead) {
+      return VectorHolder.dummyHolder(numValsToRead);
+    }
+
+    @Override
+    public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
+    }
+
+    @Override
+    public String toString() {
+      return "NullReader";
+    }
+
+    @Override
+    public void setBatchSize(int batchSize) {}
+  }
 
-        @Override
-        public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
-        }
-      };
 }
 
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index 2692cfc..cb9d278 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -36,20 +36,24 @@ import org.apache.parquet.column.page.PageReader;
 public class VectorizedColumnIterator extends BaseColumnIterator {
 
   private final VectorizedPageIterator vectorizedPageIterator;
-  private final int batchSize;
+  private int batchSize;
 
-  public VectorizedColumnIterator(ColumnDescriptor desc, String writerVersion, int batchSize,
-                                  boolean setArrowValidityVector) {
+  public VectorizedColumnIterator(ColumnDescriptor desc, String writerVersion, boolean setArrowValidityVector) {
     super(desc);
     Preconditions.checkArgument(desc.getMaxRepetitionLevel() == 0,
         "Only non-nested columns are supported for vectorized reads");
-    this.batchSize = batchSize;
     this.vectorizedPageIterator = new VectorizedPageIterator(desc, writerVersion, setArrowValidityVector);
   }
 
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
   public Dictionary setRowGroupInfo(PageReader store, boolean allPagesDictEncoded) {
-    super.setPageSource(store);
+    // setPageSource can result in a data page read. If that happens, we need
+    // to know in advance whether all the pages in the row group are dictionary encoded or not
     this.vectorizedPageIterator.setAllPagesDictEncoded(allPagesDictEncoded);
+    super.setPageSource(store);
     return dictionary;
   }
 
@@ -199,4 +203,8 @@ public class VectorizedColumnIterator extends BaseColumnIterator {
     return vectorizedPageIterator;
   }
 
+  public boolean producesDictionaryEncodedVector() {
+    return vectorizedPageIterator.producesDictionaryEncodedVector();
+  }
+
 }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index e71d61a..52e389e 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -19,7 +19,6 @@
 
 package org.apache.iceberg.arrow.vectorized.parquet;
 
-import io.netty.buffer.ArrowBuf;
 import java.nio.ByteBuffer;
 import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.BitVectorHelper;
@@ -53,15 +52,14 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
         case RLE:
           for (int i = 0; i < numValues; i++) {
             intVector.set(idx, currentValue);
-            nullabilityHolder.setNotNull(idx);
+            setNotNull(intVector, nullabilityHolder, idx);
             idx++;
           }
           break;
         case PACKED:
           for (int i = 0; i < numValues; i++) {
-            intVector.set(idx, packedValuesBuffer[packedValuesBufferIdx]);
-            nullabilityHolder.setNotNull(idx);
-            packedValuesBufferIdx++;
+            intVector.set(idx, packedValuesBuffer[packedValuesBufferIdx++]);
+            setNotNull(intVector, nullabilityHolder, idx);
             idx++;
           }
           break;
@@ -72,7 +70,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
   }
 
   void readBatchOfDictionaryEncodedLongs(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
-                                         NullabilityHolder nullabilityHolder) {
+                                         NullabilityHolder nullabilityHolder, int typeWidth) {
     int left = numValuesToRead;
     int idx = startOffset;
     while (left > 0) {
@@ -83,24 +81,16 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
       switch (mode) {
         case RLE:
           for (int i = 0; i < numValues; i++) {
-            vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer().setLong(idx * typeWidth, dict.decodeToLong(currentValue));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
         case PACKED:
           for (int i = 0; i < numValues; i++) {
             vector.getDataBuffer()
-                .setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+                .setLong(idx * typeWidth, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
@@ -110,8 +100,9 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
     }
   }
 
-  void readBatchOfDictionaryEncodedTimestampMillis(FieldVector vector, int startOffset, int numValuesToRead,
-                                                   Dictionary dict, NullabilityHolder nullabilityHolder) {
+  void readBatchOfDictionaryEncodedTimestampMillis(
+      FieldVector vector, int startOffset, int numValuesToRead,
+      Dictionary dict, NullabilityHolder nullabilityHolder, int typeWidth) {
     int left = numValuesToRead;
     int idx = startOffset;
     while (left > 0) {
@@ -122,24 +113,16 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
       switch (mode) {
         case RLE:
           for (int i = 0; i < numValues; i++) {
-            vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue) * 1000);
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer().setLong(idx * typeWidth, dict.decodeToLong(currentValue) * 1000);
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
         case PACKED:
           for (int i = 0; i < numValues; i++) {
             vector.getDataBuffer()
-                .setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]) * 1000);
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+                .setLong(idx * typeWidth, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]) * 1000);
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
@@ -150,7 +133,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
   }
 
   void readBatchOfDictionaryEncodedIntegers(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
-                                            NullabilityHolder nullabilityHolder) {
+                                            NullabilityHolder nullabilityHolder, int typeWidth) {
     int left = numValuesToRead;
     int idx = startOffset;
     while (left > 0) {
@@ -158,27 +141,19 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
         this.readNextGroup();
       }
       int num = Math.min(left, this.currentCount);
-      ArrowBuf dataBuffer = vector.getDataBuffer();
       switch (mode) {
         case RLE:
           for (int i = 0; i < num; i++) {
-            dataBuffer.setInt(idx, dict.decodeToInt(currentValue));
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer().setInt(idx * typeWidth, dict.decodeToInt(currentValue));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
         case PACKED:
           for (int i = 0; i < num; i++) {
-            dataBuffer.setInt(idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer()
+                .setInt(idx * typeWidth, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
@@ -189,7 +164,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
   }
 
   void readBatchOfDictionaryEncodedFloats(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
-                                          NullabilityHolder nullabilityHolder) {
+                                          NullabilityHolder nullabilityHolder, int typeWidth) {
     int left = numValuesToRead;
     int idx = startOffset;
     while (left > 0) {
@@ -200,23 +175,16 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
       switch (mode) {
         case RLE:
           for (int i = 0; i < num; i++) {
-            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer().setFloat(idx * typeWidth, dict.decodeToFloat(currentValue));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
         case PACKED:
           for (int i = 0; i < num; i++) {
-            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer()
+                .setFloat(idx * typeWidth, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
@@ -227,7 +195,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
   }
 
   void readBatchOfDictionaryEncodedDoubles(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
-                                           NullabilityHolder nullabilityHolder) {
+                                           NullabilityHolder nullabilityHolder, int typeWidth) {
     int left = numValuesToRead;
     int idx = startOffset;
     while (left > 0) {
@@ -238,24 +206,16 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
       switch (mode) {
         case RLE:
           for (int i = 0; i < num; i++) {
-            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
-            nullabilityHolder.setNotNull(idx);
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer().setDouble(idx * typeWidth, dict.decodeToDouble(currentValue));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
         case PACKED:
           for (int i = 0; i < num; i++) {
-            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            vector.getDataBuffer()
+                .setDouble(idx * typeWidth, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
+            setNotNull(vector, nullabilityHolder, idx);
             idx++;
           }
           break;
@@ -279,27 +239,14 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
         case RLE:
           for (int i = 0; i < num; i++) {
             ByteBuffer buffer = dict.decodeToBinary(currentValue).toByteBuffer();
-            vector.getDataBuffer().setBytes(idx * typeWidth, buffer.array(),
-                buffer.position() + buffer.arrayOffset(), buffer.limit() - buffer.position());
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            setFixedWidthBinary(vector, typeWidth, nullabilityHolder, idx, buffer);
             idx++;
           }
           break;
         case PACKED:
           for (int i = 0; i < num; i++) {
             ByteBuffer buffer = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).toByteBuffer();
-            vector.getDataBuffer()
-                .setBytes(idx * typeWidth, buffer.array(),
-                    buffer.position() + buffer.arrayOffset(), buffer.limit() - buffer.position());
-            if (setArrowValidityVector) {
-              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-            } else {
-              nullabilityHolder.setNotNull(idx);
-            }
+            setFixedWidthBinary(vector, typeWidth, nullabilityHolder, idx, buffer);
             idx++;
           }
           break;
@@ -309,6 +256,22 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
     }
   }
 
+  private void setFixedWidthBinary(
+      FieldVector vector, int typeWidth, NullabilityHolder nullabilityHolder,
+      int idx, ByteBuffer buffer) {
+    vector.getDataBuffer()
+        .setBytes(idx * typeWidth, buffer.array(),
+            buffer.position() + buffer.arrayOffset(), buffer.limit() - buffer.position());
+    setNotNull(vector, nullabilityHolder, idx);
+  }
+
+  private void setNotNull(FieldVector vector, NullabilityHolder nullabilityHolder, int idx) {
+    nullabilityHolder.setNotNull(idx);
+    if (setArrowValidityVector) {
+      BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+    }
+  }
+
   void readBatchOfDictionaryEncodedFixedLengthDecimals(FieldVector vector, int typeWidth, int startOffset,
                                                        int numValuesToRead, Dictionary dict,
                                                        NullabilityHolder nullabilityHolder) {
@@ -405,7 +368,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
             ((DecimalVector) vector).set(
                 idx,
                 typeWidth == Integer.BYTES ?
-                    dict.decodeToInt(currentValue)
+                    dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++])
                     : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
             nullabilityHolder.setNotNull(idx);
             idx++;
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 7cc32e0..2aa6f2c 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -47,12 +47,19 @@ public class VectorizedPageIterator extends BasePageIterator {
     this.setArrowValidityVector = setValidityVector;
   }
 
-  private boolean eagerDecodeDictionary;
   private ValuesAsBytesReader plainValuesReader = null;
   private VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader = null;
   private boolean allPagesDictEncoded;
   private VectorizedParquetDefinitionLevelReader vectorizedDefinitionLevelReader;
 
+  private enum DictionaryDecodeMode {
+    NONE, // plain encoding
+    LAZY,
+    EAGER
+  }
+
+  private DictionaryDecodeMode dictionaryDecodeMode;
+
   public void setAllPagesDictEncoded(boolean allDictEncoded) {
     this.allPagesDictEncoded = allDictEncoded;
   }
@@ -98,7 +105,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedIntegers(
           vector,
           numValsInVector,
@@ -132,7 +139,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedLongs(
           vector,
           numValsInVector,
@@ -168,7 +175,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedTimestampMillis(
           vector,
           numValsInVector,
@@ -202,7 +209,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFloats(
           vector,
           numValsInVector,
@@ -236,7 +243,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedDoubles(
           vector,
           numValsInVector,
@@ -274,7 +281,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader
           .readBatchOfDictionaryEncodedIntLongBackedDecimals(
               vector,
@@ -312,7 +319,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals(
           vector,
           numValsInVector,
@@ -347,7 +354,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedVarWidth(
           vector,
           numValsInVector,
@@ -380,7 +387,7 @@ public class VectorizedPageIterator extends BasePageIterator {
     if (actualBatchSize <= 0) {
       return 0;
     }
-    if (eagerDecodeDictionary) {
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary(
           vector,
           numValsInVector,
@@ -403,6 +410,10 @@ public class VectorizedPageIterator extends BasePageIterator {
     return actualBatchSize;
   }
 
+  public boolean producesDictionaryEncodedVector() {
+    return dictionaryDecodeMode == DictionaryDecodeMode.LAZY;
+  }
+
   /**
    * Method for reading batches of booleans.
    */
@@ -426,8 +437,6 @@ public class VectorizedPageIterator extends BasePageIterator {
   @Override
   protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
     ValuesReader previousReader = plainValuesReader;
-    this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dictionary != null &&
-        (ParquetUtil.isIntType(desc.getPrimitiveType()) || !allPagesDictEncoded);
     if (dataEncoding.usesDictionary()) {
       if (dictionary == null) {
         throw new ParquetDecodingException(
@@ -437,12 +446,18 @@ public class VectorizedPageIterator extends BasePageIterator {
         dictionaryEncodedValuesReader =
             new VectorizedDictionaryEncodedParquetValuesReader(desc.getMaxDefinitionLevel(), setArrowValidityVector);
         dictionaryEncodedValuesReader.initFromPage(valueCount, in);
+        if (ParquetUtil.isIntType(desc.getPrimitiveType()) || !allPagesDictEncoded) {
+          dictionaryDecodeMode = DictionaryDecodeMode.EAGER;
+        } else {
+          dictionaryDecodeMode = DictionaryDecodeMode.LAZY;
+        }
       } catch (IOException e) {
         throw new ParquetDecodingException("could not read page in col " + desc, e);
       }
     } else {
       plainValuesReader = new ValuesAsBytesReader();
       plainValuesReader.initFromPage(valueCount, in);
+      dictionaryDecodeMode = DictionaryDecodeMode.NONE;
     }
     if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
         previousReader != null && previousReader instanceof RequiresPreviousReader) {
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index 86918f7..8a26348 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -63,11 +63,10 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < numValues; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              vector.set(idx, dictionaryEncodedValuesReader.readInteger());
+              vector.getDataBuffer().setInt(idx * IntVector.TYPE_WIDTH, dictionaryEncodedValuesReader.readInteger());
+              nullabilityHolder.setNotNull(idx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-              } else {
-                nullabilityHolder.setNotNull(idx);
               }
             } else {
               setNull(nullabilityHolder, idx, vector.getValidityBuffer());
@@ -106,10 +105,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
           for (int i = 0; i < numValues; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               vector.getDataBuffer().setLong(bufferIdx * typeWidth, valuesReader.readLong());
+              nullabilityHolder.setNotNull(bufferIdx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-              } else {
-                nullabilityHolder.setNotNull(bufferIdx);
               }
             } else {
               setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
@@ -140,12 +138,11 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
             for (int i = 0; i < numValues; i++) {
               vector.getDataBuffer().setLong(bufferIdx * typeWidth, valuesReader.readLong() * 1000);
             }
+            nullabilityHolder.setNotNulls(bufferIdx, numValues);
             if (setArrowValidityVector) {
               for (int i = 0; i < numValues; i++) {
                 BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx + i);
               }
-            } else {
-              nullabilityHolder.setNotNulls(bufferIdx, numValues);
             }
           } else {
             setNulls(nullabilityHolder, bufferIdx, numValues, validityBuffer);
@@ -156,10 +153,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
           for (int i = 0; i < numValues; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               vector.getDataBuffer().setLong(bufferIdx * typeWidth, valuesReader.readLong() * 1000);
+              nullabilityHolder.setNotNull(bufferIdx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-              } else {
-                nullabilityHolder.setNotNull(bufferIdx);
               }
             } else {
               setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
@@ -193,7 +189,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case RLE:
           if (currentValue == maxDefLevel) {
             dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedLongs(vector,
-                idx, numValues, dict, nullabilityHolder);
+                idx, numValues, dict, nullabilityHolder, typeWidth);
           } else {
             setNulls(nullabilityHolder, idx, numValues, validityBuffer);
           }
@@ -202,11 +198,11 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < numValues; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setLong(idx, dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
+              vector.getDataBuffer().setLong(idx * typeWidth,
+                  dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
+              nullabilityHolder.setNotNull(idx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-              } else {
-                nullabilityHolder.setNotNull(idx);
               }
             } else {
               setNull(nullabilityHolder, idx, validityBuffer);
@@ -240,7 +236,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case RLE:
           if (currentValue == maxDefLevel) {
             dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedTimestampMillis(vector,
-                idx, numValues, dict, nullabilityHolder);
+                idx, numValues, dict, nullabilityHolder, typeWidth);
           } else {
             setNulls(nullabilityHolder, idx, numValues, validityBuffer);
           }
@@ -249,12 +245,11 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < numValues; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setLong(idx,
+              vector.getDataBuffer().setLong(idx * typeWidth,
                   dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()) * 1000);
+              nullabilityHolder.setNotNull(idx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-              } else {
-                nullabilityHolder.setNotNull(idx);
               }
             } else {
               setNull(nullabilityHolder, idx, validityBuffer);
@@ -293,10 +288,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
           for (int i = 0; i < num; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               vector.getDataBuffer().setInt(bufferIdx * typeWidth, valuesReader.readInteger());
+              nullabilityHolder.setNotNull(bufferIdx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-              } else {
-                nullabilityHolder.setNotNull(bufferIdx);
               }
             } else {
               setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
@@ -329,7 +323,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case RLE:
           if (currentValue == maxDefLevel) {
             dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedIntegers(vector, idx,
-                num, dict, nullabilityHolder);
+                num, dict, nullabilityHolder, typeWidth);
           } else {
             setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
           }
@@ -338,11 +332,11 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < num; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setInt(idx, dict.decodeToInt(dictionaryEncodedValuesReader.readInteger()));
+              vector.getDataBuffer()
+                  .setInt(idx * typeWidth, dict.decodeToInt(dictionaryEncodedValuesReader.readInteger()));
+              nullabilityHolder.setNotNull(idx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-              } else {
-                nullabilityHolder.setNotNull(idx);
               }
             } else {
               setNull(nullabilityHolder, idx, vector.getValidityBuffer());
@@ -381,10 +375,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
           for (int i = 0; i < num; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               vector.getDataBuffer().setFloat(bufferIdx * typeWidth, valuesReader.readFloat());
+              nullabilityHolder.setNotNull(bufferIdx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-              } else {
-                nullabilityHolder.setNotNull(bufferIdx);
               }
             } else {
               setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
@@ -418,7 +411,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case RLE:
           if (currentValue == maxDefLevel) {
             dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedFloats(vector, idx,
-                num, dict, nullabilityHolder);
+                num, dict, nullabilityHolder, typeWidth);
           } else {
             setNulls(nullabilityHolder, idx, num, validityBuffer);
           }
@@ -427,11 +420,11 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < num; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(dictionaryEncodedValuesReader.readInteger()));
+              vector.getDataBuffer()
+                  .setFloat(idx * typeWidth, dict.decodeToFloat(dictionaryEncodedValuesReader.readInteger()));
+              nullabilityHolder.setNotNull(idx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-              } else {
-                nullabilityHolder.setNotNull(idx);
               }
             } else {
               setNull(nullabilityHolder, idx, validityBuffer);
@@ -471,10 +464,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
           for (int i = 0; i < num; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               vector.getDataBuffer().setDouble(bufferIdx * typeWidth, valuesReader.readDouble());
+              nullabilityHolder.setNotNull(bufferIdx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(),  bufferIdx);
-              } else {
-                nullabilityHolder.setNotNull(bufferIdx);
               }
             } else {
               setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
@@ -507,7 +499,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case RLE:
           if (currentValue == maxDefLevel) {
             dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedDoubles(vector, idx,
-                num, dict, nullabilityHolder);
+                num, dict, nullabilityHolder, typeWidth);
           } else {
             setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
           }
@@ -516,11 +508,11 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < num; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(dictionaryEncodedValuesReader.readInteger()));
+              vector.getDataBuffer()
+                  .setDouble(idx * typeWidth, dict.decodeToDouble(dictionaryEncodedValuesReader.readInteger()));
+              nullabilityHolder.setNotNull(idx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-              } else {
-                nullabilityHolder.setNotNull(idx);
               }
             } else {
               setNull(nullabilityHolder, idx, vector.getValidityBuffer());
@@ -604,10 +596,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
               ByteBuffer buffer = dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).toByteBuffer();
               vector.getDataBuffer().setBytes(idx * typeWidth, buffer.array(),
                   buffer.position() + buffer.arrayOffset(), buffer.limit() - buffer.position());
+              nullabilityHolder.setNotNull(idx);
               if (setArrowValidityVector) {
                 BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-              } else {
-                nullabilityHolder.setNotNull(idx);
               }
             } else {
               setNull(nullabilityHolder, idx, vector.getValidityBuffer());
@@ -764,10 +755,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         buffer.limit() - buffer.position());
     // Similarly, we need to get the latest reference to the validity buffer as well
     // since reallocation changes reference of the validity buffers as well.
+    nullabilityHolder.setNotNull(bufferIdx);
     if (setArrowValidityVector) {
       BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-    } else {
-      nullabilityHolder.setNotNull(bufferIdx);
     }
   }
 
@@ -858,10 +848,9 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
                                        ValuesAsBytesReader valuesReader, int bufferIdx, byte[] byteArray) {
     valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
     vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+    nullabilityHolder.setNotNull(bufferIdx);
     if (setArrowValidityVector) {
       BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-    } else {
-      nullabilityHolder.setNotNull(bufferIdx);
     }
   }
 
@@ -972,12 +961,11 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
     if (currentValue == maxDefLevel) {
       ByteBuffer buffer = valuesReader.getBuffer(numValues * typeWidth);
       vector.getDataBuffer().setBytes(bufferIdx * typeWidth, buffer);
+      nullabilityHolder.setNotNulls(bufferIdx, numValues);
       if (setArrowValidityVector) {
         for (int i = 0; i < numValues; i++) {
           BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx + i);
         }
-      } else {
-        nullabilityHolder.setNotNulls(bufferIdx, numValues);
       }
     } else {
       setNulls(nullabilityHolder, bufferIdx, numValues, validityBuffer);
@@ -985,20 +973,18 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
   }
 
   private void setNull(NullabilityHolder nullabilityHolder, int bufferIdx, ArrowBuf validityBuffer) {
+    nullabilityHolder.setNull(bufferIdx);
     if (setArrowValidityVector) {
       BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
-    } else {
-      nullabilityHolder.setNull(bufferIdx);
     }
   }
 
   private void setNulls(NullabilityHolder nullabilityHolder, int idx, int numValues, ArrowBuf validityBuffer) {
+    nullabilityHolder.setNulls(idx, numValues);
     if (setArrowValidityVector) {
       for (int i = 0; i < numValues; i++) {
         BitVectorHelper.setValidityBit(validityBuffer, idx + i, 0);
       }
-    } else {
-      nullabilityHolder.setNulls(idx, numValues);
     }
   }
 
diff --git a/build.gradle b/build.gradle
index c5bed32..f76c719 100644
--- a/build.gradle
+++ b/build.gradle
@@ -410,6 +410,7 @@ project(':iceberg-spark') {
     compile project(':iceberg-parquet')
     compile project(':iceberg-arrow')
     compile project(':iceberg-hive')
+    compile project(':iceberg-arrow')
 
     compileOnly "org.apache.avro:avro"
     compileOnly("org.apache.spark:spark-hive_2.11") {
@@ -428,6 +429,18 @@ project(':iceberg-spark') {
       exclude group: 'org.apache.avro', module: 'avro'
     }
   }
+
+  test {
+    // For vectorized reads
+    // Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
+    systemProperty("arrow.enable_unsafe_memory_access", "true")
+    // Disable expensive null check for every get(index) call.
+    // Iceberg manages nullability checks itself instead of relying on arrow.
+    systemProperty("arrow.enable_null_check_for_get", "false")
+
+    // Vectorized reads need more memory
+    maxHeapSize '2500m'
+  }
 }
 
 project(':iceberg-spark3') {
@@ -455,6 +468,17 @@ project(':iceberg-spark3') {
     testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
   }
+  test {
+    // For vectorized reads
+    // Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
+    systemProperty("arrow.enable_unsafe_memory_access", "true")
+    // Disable expensive null check for every get(index) call.
+    // Iceberg manages nullability checks itself instead of relying on arrow.
+    systemProperty("arrow.enable_null_check_for_get", "false")
+
+    // Vectorized reads need more memory
+    maxHeapSize '2500m'
+  }
 }
 
 project(':iceberg-pig') {
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index f2ad3e5..39d0668 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -77,6 +77,12 @@ public class TableProperties {
   public static final String SPLIT_OPEN_FILE_COST = "read.split.open-file-cost";
   public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB
 
+  public static final String PARQUET_VECTORIZATION_ENABLED = "read.parquet.vectorization.enabled";
+  public static final boolean PARQUET_VECTORIZATION_ENABLED_DEFAULT = false;
+
+  public static final String PARQUET_BATCH_SIZE = "read.parquet.vectorization.batch-size";
+  public static final int PARQUET_BATCH_SIZE_DEFAULT = 5000;
+
   public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
   public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index f92230c..c4c8ebf 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -280,6 +280,7 @@ public class ParquetUtil {
         case INT_8:
         case INT_16:
         case INT_32:
+        case DATE:
           return true;
         default:
           return false;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
index c3f87ee..6cb9da5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
@@ -90,6 +90,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
     private final long totalValues;
     private final int batchSize;
     private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
+    private final boolean reuseContainers;
     private int nextRowGroup = 0;
     private long nextRowGroupStart = 0;
     private long valuesRead = 0;
@@ -98,13 +99,15 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
     FileIterator(ReadConf conf) {
       this.reader = conf.reader();
       this.shouldSkip = conf.shouldSkip();
-      this.model = conf.vectorizedModel();
       this.totalValues = conf.totalValues();
-      this.model.reuseContainers(conf.reuseContainers());
+      this.reuseContainers = conf.reuseContainers();
+      this.model = conf.vectorizedModel();
       this.batchSize = conf.batchSize();
+      this.model.setBatchSize(this.batchSize);
       this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
     }
 
+
     @Override
     public boolean hasNext() {
       return valuesRead < totalValues;
@@ -118,10 +121,16 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
       if (valuesRead >= nextRowGroupStart) {
         advance();
       }
-      long numValuesToRead = Math.min(nextRowGroupStart - valuesRead, batchSize);
+
       // batchSize is an integer, so casting to integer is safe
-      this.last = model.read((int) numValuesToRead);
+      int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize);
+      if (reuseContainers) {
+        this.last = model.read(last, numValuesToRead);
+      } else {
+        this.last = model.read(null, numValuesToRead);
+      }
       valuesRead += numValuesToRead;
+
       return last;
     }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
index 3eb3303..25c16f0 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
@@ -31,25 +31,23 @@ public interface VectorizedReader<T> {
 
   /**
    * Reads a batch of type @param &lt;T&gt; and of size numRows
+   *
+   * @param reuse   container for the last batch to be reused for next batch
    * @param numRows number of rows to read
    * @return batch of records of type @param &lt;T&gt;
    */
-  T read(int numRows);
+  T read(T reuse, int numRows);
+
+  void setBatchSize(int batchSize);
 
   /**
-   *
-   * @param pages row group information for all the columns
+   * @param pages    row group information for all the columns
    * @param metadata map of {@link ColumnPath} -&gt; {@link ColumnChunkMetaData} for the row group
    */
   void setRowGroupInfo(PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata);
 
   /**
-   * Set up the reader to reuse the underlying containers used for storing batches
-   */
-  void reuseContainers(boolean reuse);
-
-  /**
-   * Release any resources allocated
+   * Release any resources allocated.
    */
   void close();
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index 57863e0..91568db 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.sql.Dataset;
@@ -92,15 +93,24 @@ public abstract class IcebergSourceBenchmark {
     }
   }
 
-  protected void setupSpark() {
-    spark = SparkSession.builder()
-        .config("spark.ui.enabled", false)
-        .master("local")
-        .getOrCreate();
+  protected void setupSpark(boolean enableDictionaryEncoding) {
+    SparkSession.Builder builder = SparkSession.builder()
+            .config("spark.ui.enabled", false);
+    if (!enableDictionaryEncoding) {
+      builder.config("parquet.dictionary.page.size", "1")
+              .config("parquet.enable.dictionary", false)
+              .config(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
+    }
+    builder.master("local");
+    spark = builder.getOrCreate();
     Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
     hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue()));
   }
 
+  protected void setupSpark() {
+    setupSpark(false);
+  }
+
   protected void tearDownSpark() {
     spark.stop();
   }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java
new file mode 100644
index 0000000..c9c9959
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.arrow.vector.ValueVector;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public abstract class ArrowVectorAccessor {
+
+  private final ValueVector vector;
+  private final ArrowColumnVector[] childColumns;
+
+  ArrowVectorAccessor(ValueVector vector) {
+    this.vector = vector;
+    this.childColumns = new ArrowColumnVector[0];
+  }
+
+  ArrowVectorAccessor(ValueVector vector, ArrowColumnVector[] children) {
+    this.vector = vector;
+    this.childColumns = children;
+  }
+
+  final void close() {
+    for (ArrowColumnVector column : childColumns) {
+      // Closing an ArrowColumnVector is expected to not throw any exception
+      column.close();
+    }
+    vector.close();
+  }
+
+  boolean getBoolean(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: boolean");
+  }
+
+  int getInt(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: int");
+  }
+
+  long getLong(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: long");
+  }
+
+  float getFloat(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: float");
+  }
+
+  double getDouble(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: double");
+  }
+
+  Decimal getDecimal(int rowId, int precision, int scale) {
+    throw new UnsupportedOperationException("Unsupported type: decimal");
+  }
+
+  UTF8String getUTF8String(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: UTF8String");
+  }
+
+  byte[] getBinary(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: binary");
+  }
+
+  ColumnarArray getArray(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: array");
+  }
+
+  ArrowColumnVector childColumn(int pos) {
+    return childColumns[pos];
+  }
+
+  public ValueVector getVector() {
+    return vector;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
new file mode 100644
index 0000000..74732a3
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import io.netty.buffer.ArrowBuf;
+import java.math.BigInteger;
+import java.util.stream.IntStream;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.iceberg.arrow.vectorized.IcebergArrowVectors;
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class ArrowVectorAccessors {
+
+  private ArrowVectorAccessors() {}
+
+  static ArrowVectorAccessor getVectorAccessor(VectorHolder holder) {
+    Dictionary dictionary = holder.dictionary();
+    boolean isVectorDictEncoded = holder.isDictionaryEncoded();
+    ColumnDescriptor desc = holder.descriptor();
+    FieldVector vector = holder.vector();
+    PrimitiveType primitive = desc.getPrimitiveType();
+    if (isVectorDictEncoded) {
+      return getDictionaryVectorAccessor(dictionary, desc, vector, primitive);
+    } else {
+      return getPlainVectorAccessor(vector);
+    }
+  }
+
+  @NotNull
+  private static ArrowVectorAccessor getDictionaryVectorAccessor(
+      Dictionary dictionary,
+      ColumnDescriptor desc,
+      FieldVector vector, PrimitiveType primitive) {
+    Preconditions.checkState(vector instanceof IntVector, "Dictionary ids should be stored in IntVectors only");
+    if (primitive.getOriginalType() != null) {
+      switch (desc.getPrimitiveType().getOriginalType()) {
+        case ENUM:
+        case JSON:
+        case UTF8:
+        case BSON:
+          return new DictionaryStringAccessor((IntVector) vector, dictionary);
+        case INT_64:
+        case TIMESTAMP_MILLIS:
+        case TIMESTAMP_MICROS:
+          return new DictionaryLongAccessor((IntVector) vector, dictionary);
+        case DECIMAL:
+          switch (primitive.getPrimitiveTypeName()) {
+            case BINARY:
+            case FIXED_LEN_BYTE_ARRAY:
+              return new DictionaryDecimalBinaryAccessor(
+                  (IntVector) vector,
+                  dictionary);
+            case INT64:
+              return new DictionaryDecimalLongAccessor(
+                  (IntVector) vector,
+                  dictionary);
+            case INT32:
+              return new DictionaryDecimalIntAccessor(
+                  (IntVector) vector,
+                  dictionary);
+            default:
+              throw new UnsupportedOperationException(
+                  "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+          }
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported logical type: " + primitive.getOriginalType());
+      }
+    } else {
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new DictionaryBinaryAccessor((IntVector) vector, dictionary);
+        case FLOAT:
+          return new DictionaryFloatAccessor((IntVector) vector, dictionary);
+        case INT64:
+          return new DictionaryLongAccessor((IntVector) vector, dictionary);
+        case DOUBLE:
+          return new DictionaryDoubleAccessor((IntVector) vector, dictionary);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + primitive);
+      }
+    }
+  }
+
+  @NotNull
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private static ArrowVectorAccessor getPlainVectorAccessor(FieldVector vector) {
+    if (vector instanceof BitVector) {
+      return new BooleanAccessor((BitVector) vector);
+    } else if (vector instanceof IntVector) {
+      return new IntAccessor((IntVector) vector);
+    } else if (vector instanceof BigIntVector) {
+      return new LongAccessor((BigIntVector) vector);
+    } else if (vector instanceof Float4Vector) {
+      return new FloatAccessor((Float4Vector) vector);
+    } else if (vector instanceof Float8Vector) {
+      return new DoubleAccessor((Float8Vector) vector);
+    } else if (vector instanceof IcebergArrowVectors.DecimalArrowVector) {
+      return new DecimalAccessor((IcebergArrowVectors.DecimalArrowVector) vector);
+    } else if (vector instanceof IcebergArrowVectors.VarcharArrowVector) {
+      return new StringAccessor((IcebergArrowVectors.VarcharArrowVector) vector);
+    } else if (vector instanceof VarBinaryVector) {
+      return new BinaryAccessor((VarBinaryVector) vector);
+    } else if (vector instanceof DateDayVector) {
+      return new DateAccessor((DateDayVector) vector);
+    } else if (vector instanceof TimeStampMicroTZVector) {
+      return new TimestampAccessor((TimeStampMicroTZVector) vector);
+    } else if (vector instanceof ListVector) {
+      ListVector listVector = (ListVector) vector;
+      return new ArrayAccessor(listVector);
+    } else if (vector instanceof StructVector) {
+      StructVector structVector = (StructVector) vector;
+      return new StructAccessor(structVector);
+    }
+    throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
+  }
+
+  private static class BooleanAccessor extends ArrowVectorAccessor {
+
+    private final BitVector vector;
+
+    BooleanAccessor(BitVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final boolean getBoolean(int rowId) {
+      return vector.get(rowId) == 1;
+    }
+  }
+
+  private static class IntAccessor extends ArrowVectorAccessor {
+
+    private final IntVector vector;
+
+    IntAccessor(IntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final int getInt(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class LongAccessor extends ArrowVectorAccessor {
+
+    private final BigIntVector vector;
+
+    LongAccessor(BigIntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryLongAccessor extends ArrowVectorAccessor {
+    private final IntVector offsetVector;
+    private final long[] decodedDictionary;
+
+    DictionaryLongAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToLong(dictionary::decodeToLong)
+          .toArray();
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return decodedDictionary[offsetVector.get(rowId)];
+    }
+  }
+
+  private static class FloatAccessor extends ArrowVectorAccessor {
+
+    private final Float4Vector vector;
+
+    FloatAccessor(Float4Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final float getFloat(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryFloatAccessor extends ArrowVectorAccessor {
+    private final IntVector offsetVector;
+    private final float[] decodedDictionary;
+
+    DictionaryFloatAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = new float[dictionary.getMaxId() + 1];
+      for (int i = 0; i <= dictionary.getMaxId(); i++) {
+        decodedDictionary[i] = dictionary.decodeToFloat(i);
+      }
+    }
+
+    @Override
+    final float getFloat(int rowId) {
+      return decodedDictionary[offsetVector.get(rowId)];
+    }
+  }
+
+  private static class DoubleAccessor extends ArrowVectorAccessor {
+
+    private final Float8Vector vector;
+
+    DoubleAccessor(Float8Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final double getDouble(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryDoubleAccessor extends ArrowVectorAccessor {
+    private final IntVector offsetVector;
+    private final double[] decodedDictionary;
+
+    DictionaryDoubleAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToDouble(dictionary::decodeToDouble)
+          .toArray();
+    }
+
+    @Override
+    final double getDouble(int rowId) {
+      return decodedDictionary[offsetVector.get(rowId)];
+    }
+  }
+
+  private static class StringAccessor extends ArrowVectorAccessor {
+
+    private final IcebergArrowVectors.VarcharArrowVector vector;
+    private final NullableVarCharHolder stringResult = new NullableVarCharHolder();
+
+    StringAccessor(IcebergArrowVectors.VarcharArrowVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final UTF8String getUTF8String(int rowId) {
+      vector.get(rowId, stringResult);
+      if (stringResult.isSet == 0) {
+        return null;
+      } else {
+        return UTF8String.fromAddress(
+            null,
+            stringResult.buffer.memoryAddress() + stringResult.start,
+            stringResult.end - stringResult.start);
+      }
+    }
+  }
+
+  private static class DictionaryStringAccessor extends ArrowVectorAccessor {
+    private final UTF8String[] decodedDictionary;
+    private final IntVector offsetVector;
+
+    DictionaryStringAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToObj(dictionary::decodeToBinary)
+          .map(binary -> UTF8String.fromBytes(binary.getBytes()))
+          .toArray(UTF8String[]::new);
+    }
+
+    @Override
+    final UTF8String getUTF8String(int rowId) {
+      int offset = offsetVector.get(rowId);
+      return decodedDictionary[offset];
+    }
+  }
+
+  private static class BinaryAccessor extends ArrowVectorAccessor {
+
+    private final VarBinaryVector vector;
+
+    BinaryAccessor(VarBinaryVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final byte[] getBinary(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryBinaryAccessor extends ArrowVectorAccessor {
+    private final IntVector offsetVector;
+    private final byte[][] decodedDictionary;
+
+    DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToObj(dictionary::decodeToBinary)
+          .map(binary -> binary.getBytes())
+          .toArray(byte[][]::new);
+    }
+
+    @Override
+    final byte[] getBinary(int rowId) {
+      int offset = offsetVector.get(rowId);
+      return decodedDictionary[offset];
+    }
+  }
+
+  private static class DateAccessor extends ArrowVectorAccessor {
+
+    private final DateDayVector vector;
+
+    DateAccessor(DateDayVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final int getInt(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class TimestampAccessor extends ArrowVectorAccessor {
+
+    private final TimeStampMicroTZVector vector;
+
+    TimestampAccessor(TimeStampMicroTZVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class ArrayAccessor extends ArrowVectorAccessor {
+
+    private final ListVector vector;
+    private final ArrowColumnVector arrayData;
+
+    ArrayAccessor(ListVector vector) {
+      super(vector);
+      this.vector = vector;
+      this.arrayData = new ArrowColumnVector(vector.getDataVector());
+    }
+
+    @Override
+    final ColumnarArray getArray(int rowId) {
+      ArrowBuf offsets = vector.getOffsetBuffer();
+      int index = rowId * ListVector.OFFSET_WIDTH;
+      int start = offsets.getInt(index);
+      int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
+      return new ColumnarArray(arrayData, start, end - start);
+    }
+  }
+
+  /**
+   * Use {@link IcebergArrowColumnVector#getChild(int)} to get hold of the {@link ArrowColumnVector} vectors holding the
+   * struct values.
+   */
+  private static class StructAccessor extends ArrowVectorAccessor {
+    StructAccessor(StructVector structVector) {
+      super(structVector, IntStream.range(0, structVector.size())
+          .mapToObj(structVector::getVectorById)
+          .map(ArrowColumnVector::new)
+          .toArray(ArrowColumnVector[]::new));
+    }
+  }
+
+  private static class DecimalAccessor extends ArrowVectorAccessor {
+
+    private final IcebergArrowVectors.DecimalArrowVector vector;
+
+    DecimalAccessor(IcebergArrowVectors.DecimalArrowVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      return Decimal.apply(vector.getObject(rowId), precision, scale);
+    }
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  private abstract static class DictionaryDecimalAccessor extends ArrowVectorAccessor {
+    final Decimal[] cache;
+    Dictionary parquetDictionary;
+    final IntVector offsetVector;
+
+    private DictionaryDecimalAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.parquetDictionary = dictionary;
+      this.cache = new Decimal[dictionary.getMaxId() + 1];
+    }
+  }
+
+  private static class DictionaryDecimalBinaryAccessor extends DictionaryDecimalAccessor {
+
+    DictionaryDecimalBinaryAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector, dictionary);
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      int dictId = offsetVector.get(rowId);
+      if (cache[dictId] == null) {
+        cache[dictId] = Decimal.apply(
+            new BigInteger(parquetDictionary.decodeToBinary(dictId).getBytes()).longValue(),
+            precision,
+            scale);
+      }
+      return cache[dictId];
+    }
+  }
+
+  private static class DictionaryDecimalLongAccessor extends DictionaryDecimalAccessor {
+
+    DictionaryDecimalLongAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector, dictionary);
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      int dictId = offsetVector.get(rowId);
+      if (cache[dictId] == null) {
+        cache[dictId] = Decimal.apply(parquetDictionary.decodeToLong(dictId), precision, scale);
+      }
+      return cache[dictId];
+    }
+  }
+
+  private static class DictionaryDecimalIntAccessor extends DictionaryDecimalAccessor {
+
+    DictionaryDecimalIntAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector, dictionary);
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      int dictId = offsetVector.get(rowId);
+      if (cache[dictId] == null) {
+        cache[dictId] = Decimal.apply(parquetDictionary.decodeToInt(dictId), precision, scale);
+      }
+      return cache[dictId];
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
new file mode 100644
index 0000000..c76321e
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized read path. The
+ * {@link ColumnarBatch} returned is created by passing in the Arrow vectors populated via delegated read calls to
+ * {@linkplain VectorizedArrowReader VectorReader(s)}.
+ */
+public class ColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
+  private final VectorizedArrowReader[] readers;
+  private final VectorHolder[] vectorHolders;
+
+  public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
+    this.readers = readers.stream()
+        .map(VectorizedArrowReader.class::cast)
+        .toArray(VectorizedArrowReader[]::new);
+    this.vectorHolders = new VectorHolder[readers.size()];
+  }
+
+  @Override
+  public final void setRowGroupInfo(PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
+    for (VectorizedArrowReader reader : readers) {
+      if (reader != null) {
+        reader.setRowGroupInfo(pageStore, metaData);
+      }
+    }
+  }
+
+  @Override
+  public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
+    Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
+    ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];
+
+    if (reuse == null) {
+      closeVectors();
+    }
+
+    for (int i = 0; i < readers.length; i += 1) {
+      vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
+      int numRowsInVector = vectorHolders[i].numValues();
+      Preconditions.checkState(
+          numRowsInVector == numRowsToRead,
+          "Number of rows in the vector %s didn't match expected %s ", numRowsInVector,
+          numRowsToRead);
+      arrowColumnVectors[i] =
+          IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+    }
+    ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
+    batch.setNumRows(numRowsToRead);
+    return batch;
+  }
+
+  private void closeVectors() {
+    for (int i = 0; i < vectorHolders.length; i++) {
+      if (vectorHolders[i] != null) {
+        // Release any resources used by the vector
+        if (vectorHolders[i].vector() != null) {
+          vectorHolders[i].vector().close();
+        }
+        vectorHolders[i] = null;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    for (VectorizedReader<?> reader : readers) {
+      reader.close();
+    }
+  }
+
+  @Override
+  public void setBatchSize(int batchSize) {
+    for (VectorizedArrowReader reader : readers) {
+      if (reader != null) {
+        reader.setBatchSize(batchSize);
+      }
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
new file mode 100644
index 0000000..9d10cd9
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Implementation of Spark's {@link ColumnVector} interface. The code for this class is heavily inspired from Spark's
+ * {@link ArrowColumnVector} The main difference is in how nullability checks are made in this class by relying on
+ * {@link NullabilityHolder} instead of the validity vector in the Arrow vector.
+ */
+public class IcebergArrowColumnVector extends ColumnVector {
+
+  private final ArrowVectorAccessor accessor;
+  private final NullabilityHolder nullabilityHolder;
+
+  public IcebergArrowColumnVector(VectorHolder holder) {
+    super(SparkSchemaUtil.convert(holder.icebergType()));
+    this.nullabilityHolder = holder.nullabilityHolder();
+    this.accessor = ArrowVectorAccessors.getVectorAccessor(holder);
+  }
+
+  @Override
+  public void close() {
+    accessor.close();
+  }
+
+  @Override
+  public boolean hasNull() {
+    return nullabilityHolder.hasNulls();
+  }
+
+  @Override
+  public int numNulls() {
+    return nullabilityHolder.numNulls();
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    return nullabilityHolder.isNullAt(rowId) == 1;
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    return accessor.getBoolean(rowId);
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type - byte");
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type - short");
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return accessor.getInt(rowId);
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    return accessor.getLong(rowId);
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    return accessor.getFloat(rowId);
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    return accessor.getDouble(rowId);
+  }
+
+  @Override
+  public ColumnarArray getArray(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getArray(rowId);
+  }
+
+  @Override
+  public ColumnarMap getMap(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type - map");
+  }
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getDecimal(rowId, precision, scale);
+  }
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getUTF8String(rowId);
+  }
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getBinary(rowId);
+  }
+
+  @Override
+  public ArrowColumnVector getChild(int ordinal) {
+    return accessor.childColumn(ordinal);
+  }
+
+  static ColumnVector forHolder(VectorHolder holder, int numRows) {
+    return holder.isDummy() ? new NullValuesColumnVector(numRows) :
+        new IcebergArrowColumnVector(holder);
+  }
+
+  public ArrowVectorAccessor vectorAccessor() {
+    return accessor;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/NullValuesColumnVector.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/NullValuesColumnVector.java
new file mode 100644
index 0000000..8770d13
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/NullValuesColumnVector.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class NullValuesColumnVector extends ColumnVector {
+
+  private final int numNulls;
+  private static final Type NULL_TYPE = Types.IntegerType.get();
+
+  public NullValuesColumnVector(int nValues) {
+    super(SparkSchemaUtil.convert(NULL_TYPE));
+    this.numNulls = nValues;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public boolean hasNull() {
+    return true;
+  }
+
+  @Override
+  public int numNulls() {
+    return numNulls;
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    return true;
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ColumnarArray getArray(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ColumnarMap getMap(int ordinal) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected ColumnVector getChild(int ordinal) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
new file mode 100644
index 0000000..01cbe6f
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.ArrowAllocation;
+import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public class VectorizedSparkParquetReaders {
+
+  private VectorizedSparkParquetReaders() {
+  }
+
+  public static ColumnarBatchReader buildReader(
+      Schema expectedSchema,
+      MessageType fileSchema,
+      boolean setArrowValidityVector) {
+    return (ColumnarBatchReader)
+        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+            new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector));
+  }
+
+  private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
+    private final MessageType parquetSchema;
+    private final Schema icebergSchema;
+    private final BufferAllocator rootAllocator;
+    private final boolean setArrowValidityVector;
+
+    VectorizedReaderBuilder(
+        Schema expectedSchema,
+        MessageType parquetSchema,
+        boolean setArrowValidityVector) {
+      this.parquetSchema = parquetSchema;
+      this.icebergSchema = expectedSchema;
+      this.rootAllocator = ArrowAllocation.rootAllocator()
+          .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
+      this.setArrowValidityVector = setArrowValidityVector;
+    }
+
+    @Override
+    public VectorizedReader<?> message(
+            Types.StructType expected, MessageType message,
+            List<VectorizedReader<?>> fieldReaders) {
+      GroupType groupType = message.asGroupType();
+      Map<Integer, VectorizedReader<?>> readersById = Maps.newHashMap();
+      List<Type> fields = groupType.getFields();
+
+      IntStream.range(0, fields.size())
+          .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos)));
+
+      List<Types.NestedField> icebergFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+
+      List<VectorizedReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
+          icebergFields.size());
+
+      for (Types.NestedField field : icebergFields) {
+        int id = field.fieldId();
+        VectorizedReader<?> reader = readersById.get(id);
+        if (reader != null) {
+          reorderedFields.add(reader);
+        } else {
+          reorderedFields.add(VectorizedArrowReader.nulls());
+        }
+      }
+      return new ColumnarBatchReader(reorderedFields);
+    }
+
+    @Override
+    public VectorizedReader<?> struct(
+        Types.StructType expected, GroupType groupType,
+        List<VectorizedReader<?>> fieldReaders) {
+      if (expected != null) {
+        throw new UnsupportedOperationException("Vectorized reads are not supported yet for struct fields");
+      }
+      return null;
+    }
+
+    @Override
+    public VectorizedReader<?> primitive(
+        org.apache.iceberg.types.Type.PrimitiveType expected,
+        PrimitiveType primitive) {
+
+      // Create arrow vector for this field
+      int parquetFieldId = primitive.getId().intValue();
+      ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
+      // Nested types not yet supported for vectorized reads
+      if (desc.getMaxRepetitionLevel() > 0) {
+        return null;
+      }
+      Types.NestedField icebergField = icebergSchema.findField(parquetFieldId);
+      if (icebergField == null) {
+        return null;
+      }
+      // Set the validity buffer if null checking is enabled in arrow
+      return new VectorizedArrowReader(desc, icebergField, rootAllocator, setArrowValidityVector);
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
new file mode 100644
index 0000000..eeb3ad5
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+class BatchDataReader extends BaseDataReader<ColumnarBatch> {
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  BatchDataReader(
+      CombinedScanTask task, Schema expectedSchema, FileIO fileIo,
+      EncryptionManager encryptionManager, boolean caseSensitive, int size) {
+    super(task, fileIo, encryptionManager);
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = size;
+  }
+
+  @Override
+  CloseableIterator<ColumnarBatch> open(FileScanTask task) {
+    CloseableIterable<ColumnarBatch> iter;
+    InputFile location = getInputFile(task);
+    Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
+    if (task.file().format() == FileFormat.PARQUET) {
+      iter = Parquet.read(location)
+          .project(expectedSchema)
+          .split(task.start(), task.length())
+          .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema,
+              fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED))
+          .recordsPerBatch(batchSize)
+          .filter(task.residual())
+          .caseSensitive(caseSensitive)
+          // Spark eagerly consumes the batches. So the underlying memory allocated could be reused
+          // without worrying about subsequent reads clobbering over each other. This improves
+          // read performance as every batch read doesn't have to pay the cost of allocating memory.
+          .reuseContainers()
+          .build();
+    } else {
+      throw new UnsupportedOperationException(
+          "Format: " + task.file().format() + " not supported for batched reads");
+    }
+    return iter.iterator();
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 1f3d26e..d205c22 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
@@ -43,10 +44,12 @@ import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -59,14 +62,16 @@ import org.apache.spark.sql.sources.v2.reader.Statistics;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
+import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
-    SupportsReportStatistics {
+class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
+    SupportsPushDownRequiredColumns, SupportsReportStatistics {
   private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
 
   private static final Filter[] NO_FILTERS = new Filter[0];
@@ -87,14 +92,17 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
   private final boolean localityPreferred;
+  private final boolean batchReadsEnabled;
+  private final int batchSize;
 
   // lazy variables
   private Schema schema = null;
   private StructType type = null; // cached because Spark accesses it multiple times
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+  private Boolean readUsingBatch = null;
 
   Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-         boolean caseSensitive, DataSourceOptions options) {
+      boolean caseSensitive, DataSourceOptions options) {
     this.table = table;
     this.snapshotId = options.get("snapshot-id").map(Long::parseLong).orElse(null);
     this.asOfTimestamp = options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
@@ -145,6 +153,13 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     this.io = io;
     this.encryptionManager = encryptionManager;
     this.caseSensitive = caseSensitive;
+
+    this.batchReadsEnabled = options.get("vectorization-enabled").map(Boolean::parseBoolean).orElse(
+        PropertyUtil.propertyAsBoolean(table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED, TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT));
+    this.batchSize = options.get("batch-size").map(Integer::parseInt).orElse(
+        PropertyUtil.propertyAsInt(table.properties(),
+          TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
   }
 
   private Schema lazySchema() {
@@ -178,6 +193,30 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     return lazyType();
   }
 
+  /**
+   * This is called in the Spark Driver when data is to be materialized into {@link ColumnarBatch}
+   */
+  @Override
+  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
+    Preconditions.checkState(enableBatchRead(), "Batched reads not enabled");
+    Preconditions.checkState(batchSize > 0, "Invalid batch size");
+    String tableSchemaString = SchemaParser.toJson(table.schema());
+    String expectedSchemaString = SchemaParser.toJson(lazySchema());
+
+    List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
+    for (CombinedScanTask task : tasks()) {
+      readTasks.add(new ReadTask<>(
+          task, tableSchemaString, expectedSchemaString, io, encryptionManager, caseSensitive, localityPreferred,
+          new BatchReaderFactory(batchSize)));
+    }
+    LOG.info("Batching input partitions with {} tasks.", readTasks.size());
+
+    return readTasks;
+  }
+
+  /**
+   * This is called in the Spark Driver when data is to be materialized into {@link InternalRow}
+   */
   @Override
   public List<InputPartition<InternalRow>> planInputPartitions() {
     String tableSchemaString = SchemaParser.toJson(table.schema());
@@ -185,9 +224,9 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
-      readTasks.add(
-          new ReadTask(task, tableSchemaString, expectedSchemaString, io, encryptionManager,
-              caseSensitive, localityPreferred));
+      readTasks.add(new ReadTask<>(
+          task, tableSchemaString, expectedSchemaString, io, encryptionManager, caseSensitive, localityPreferred,
+          InternalRowReaderFactory.INSTANCE));
     }
 
     return readTasks;
@@ -249,6 +288,31 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     return new Stats(sizeInBytes, numRows);
   }
 
+  @Override
+  public boolean enableBatchRead() {
+    if (readUsingBatch == null) {
+      boolean allParquetFileScanTasks =
+          tasks().stream()
+              .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
+                  .stream()
+                  .allMatch(fileScanTask -> fileScanTask.file().format().equals(
+                      FileFormat.PARQUET)));
+
+      boolean atLeastOneColumn = lazySchema().columns().size() > 0;
+
+      boolean hasNoIdentityProjections = tasks().stream()
+          .allMatch(combinedScanTask -> combinedScanTask.files()
+              .stream()
+              .allMatch(fileScanTask -> fileScanTask.spec().identitySourceIds().isEmpty()));
+
+      boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());
+
+      this.readUsingBatch = batchReadsEnabled && allParquetFileScanTasks && atLeastOneColumn &&
+          hasNoIdentityProjections && onlyPrimitives;
+    }
+    return readUsingBatch;
+  }
+
   private static void mergeIcebergHadoopConfs(
       Configuration baseConf, Map<String, String> options) {
     options.keySet().stream()
@@ -299,7 +363,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
       try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
         this.tasks = Lists.newArrayList(tasksIterable);
-      }  catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
       }
     }
@@ -310,11 +374,11 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
   @Override
   public String toString() {
     return String.format(
-        "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
-        table, lazySchema().asStruct(), filterExpressions, caseSensitive);
+        "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s, batchedReads=%s)",
+        table, lazySchema().asStruct(), filterExpressions, caseSensitive, enableBatchRead());
   }
 
-  private static class ReadTask implements InputPartition<InternalRow>, Serializable {
+  private static class ReadTask<T> implements Serializable, InputPartition<T> {
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
@@ -322,6 +386,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     private final Broadcast<EncryptionManager> encryptionManager;
     private final boolean caseSensitive;
     private final boolean localityPreferred;
+    private final ReaderFactory<T> readerFactory;
 
     private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
@@ -329,7 +394,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
     private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
                      Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-                     boolean caseSensitive, boolean localityPreferred) {
+                     boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
       this.task = task;
       this.tableSchemaString = tableSchemaString;
       this.expectedSchemaString = expectedSchemaString;
@@ -338,12 +403,13 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
       this.caseSensitive = caseSensitive;
       this.localityPreferred = localityPreferred;
       this.preferredLocations = getPreferredLocations();
+      this.readerFactory = readerFactory;
     }
 
     @Override
-    public InputPartitionReader<InternalRow> createPartitionReader() {
-      return new RowDataReader(task, lazyTableSchema(), lazyExpectedSchema(), io.value(),
-        encryptionManager.value(), caseSensitive);
+    public InputPartitionReader<T> createPartitionReader() {
+      return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), io.value(),
+          encryptionManager.value(), caseSensitive);
     }
 
     @Override
@@ -375,6 +441,40 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     }
   }
 
+  private interface ReaderFactory<T> extends Serializable {
+    InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO io,
+                                   EncryptionManager encryptionManager, boolean caseSensitive);
+  }
+
+  private static class InternalRowReaderFactory implements ReaderFactory<InternalRow> {
+    private static final InternalRowReaderFactory INSTANCE = new InternalRowReaderFactory();
+
+    private InternalRowReaderFactory() {
+    }
+
+    @Override
+    public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
+                                                    FileIO io, EncryptionManager encryptionManager,
+                                                    boolean caseSensitive) {
+      return new RowDataReader(task, tableSchema, expectedSchema, io, encryptionManager, caseSensitive);
+    }
+  }
+
+  private static class BatchReaderFactory implements ReaderFactory<ColumnarBatch> {
+    private final int batchSize;
+
+    BatchReaderFactory(int batchSize) {
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
+                                                    FileIO io, EncryptionManager encryptionManager,
+                                                    boolean caseSensitive) {
+      return new BatchDataReader(task, expectedSchema, io, encryptionManager, caseSensitive, batchSize);
+    }
+  }
+
   private static class StructLikeInternalRow implements StructLike {
     private final DataType[] types;
     private InternalRow row = null;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
index 57e61ef..966a0e6 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark.data;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.ListType;
@@ -67,6 +68,23 @@ public abstract class AvroDataTest {
   }
 
   @Test
+  public void testStructWithRequiredFields() throws IOException {
+    writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(
+        Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asRequired))));
+  }
+
+  @Test
+  public void testStructWithOptionalFields() throws IOException {
+    writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(
+        Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asOptional))));
+  }
+
+  @Test
+  public void testNestedStruct() throws IOException {
+    writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(required(1, "struct", SUPPORTED_PRIMITIVES))));
+  }
+
+  @Test
   public void testArray() throws IOException {
     Schema schema = new Schema(
         required(0, "id", LongType.get()),
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
index b5f0b71..f99c0fc 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
@@ -20,7 +20,9 @@
 package org.apache.iceberg.spark.data;
 
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +35,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -49,10 +52,13 @@ import org.apache.spark.unsafe.types.UTF8String;
 
 public class RandomData {
 
+  // Default percentage of number of values that are null for optional fields
+  public static final float DEFAULT_NULL_PERCENTAGE = 0.05f;
+
   private RandomData() {}
 
   public static List<Record> generateList(Schema schema, int numRecords, long seed) {
-    RandomDataGenerator generator = new RandomDataGenerator(schema, seed);
+    RandomDataGenerator generator = new RandomDataGenerator(schema, seed, DEFAULT_NULL_PERCENTAGE);
     List<Record> records = Lists.newArrayListWithExpectedSize(numRecords);
     for (int i = 0; i < numRecords; i += 1) {
       records.add((Record) TypeUtil.visit(schema, generator));
@@ -83,9 +89,27 @@ public class RandomData {
   }
 
   public static Iterable<Record> generate(Schema schema, int numRecords, long seed) {
+    return newIterable(() -> new RandomDataGenerator(schema, seed, DEFAULT_NULL_PERCENTAGE), schema, numRecords);
+  }
+
+  public static Iterable<Record> generate(Schema schema, int numRecords, long seed, float nullPercentage) {
+    return newIterable(() -> new RandomDataGenerator(schema, seed, nullPercentage), schema, numRecords);
+  }
+
+  public static Iterable<Record> generateFallbackData(Schema schema, int numRecords, long seed, long numDictRecords) {
+    return newIterable(() -> new FallbackDataGenerator(schema, seed, numDictRecords), schema, numRecords);
+  }
+
+  public static Iterable<GenericData.Record> generateDictionaryEncodableData(
+      Schema schema, int numRecords, long seed, float nullPercentage) {
+    return newIterable(() -> new DictionaryEncodedDataGenerator(schema, seed, nullPercentage), schema, numRecords);
+  }
+
+  private static Iterable<Record> newIterable(Supplier<RandomDataGenerator> newGenerator,
+                                              Schema schema, int numRecords) {
     return () -> new Iterator<Record>() {
-      private RandomDataGenerator generator = new RandomDataGenerator(schema, seed);
       private int count = 0;
+      private RandomDataGenerator generator = newGenerator.get();
 
       @Override
       public boolean hasNext() {
@@ -106,8 +130,14 @@ public class RandomData {
   private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor<Object> {
     private final Map<Type, org.apache.avro.Schema> typeToSchema;
     private final Random random;
-
-    private RandomDataGenerator(Schema schema, long seed) {
+    // Percentage of number of values that are null for optional fields
+    private final float nullPercentage;
+
+    private RandomDataGenerator(Schema schema, long seed, float nullPercentage) {
+      Preconditions.checkArgument(
+          0.0f <= nullPercentage && nullPercentage <= 1.0f,
+          "Percentage needs to be in the range (0.0, 1.0)");
+      this.nullPercentage = nullPercentage;
       this.typeToSchema = AvroSchemaUtil.convertTypes(schema.asStruct(), "test");
       this.random = new Random(seed);
     }
@@ -131,21 +161,23 @@ public class RandomData {
 
     @Override
     public Object field(Types.NestedField field, Supplier<Object> fieldResult) {
-      // return null 5% of the time when the value is optional
-      if (field.isOptional() && random.nextInt(20) == 1) {
+      if (field.isOptional() && isNull()) {
         return null;
       }
       return fieldResult.get();
     }
 
+    private boolean isNull() {
+      return random.nextFloat() < nullPercentage;
+    }
+
     @Override
     public Object list(Types.ListType list, Supplier<Object> elementResult) {
       int numElements = random.nextInt(20);
 
       List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
       for (int i = 0; i < numElements; i += 1) {
-        // return null 5% of the time when the value is optional
-        if (list.isElementOptional() && random.nextInt(20) == 1) {
+        if (list.isElementOptional() && isNull()) {
           result.add(null);
         } else {
           result.add(elementResult.get());
@@ -170,8 +202,7 @@ public class RandomData {
 
         keySet.add(key);
 
-        // return null 5% of the time when the value is optional
-        if (map.isValueOptional() && random.nextInt(20) == 1) {
+        if (map.isValueOptional() && isNull()) {
           result.put(key, null);
         } else {
           result.put(key, valueResult.get());
@@ -183,7 +214,7 @@ public class RandomData {
 
     @Override
     public Object primitive(Type.PrimitiveType primitive) {
-      Object result = RandomUtil.generatePrimitive(primitive, random);
+      Object result = randomValue(primitive, random);
       // For the primitives that Avro needs a different type than Spark, fix
       // them here.
       switch (primitive.typeId()) {
@@ -198,6 +229,10 @@ public class RandomData {
           return result;
       }
     }
+
+    protected Object randomValue(Type.PrimitiveType primitive, Random rand) {
+      return RandomUtil.generatePrimitive(primitive, random);
+    }
   }
 
   private static class SparkRandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor<Object> {
@@ -295,4 +330,71 @@ public class RandomData {
       }
     }
   }
+
+  private static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType primitive, Random random) {
+    int value = random.nextInt(3);
+    switch (primitive.typeId()) {
+      case BOOLEAN:
+        return true; // doesn't really matter for booleans since they are not dictionary encoded
+      case INTEGER:
+      case DATE:
+        return value;
+      case FLOAT:
+        return (float) value;
+      case DOUBLE:
+        return (double) value;
+      case LONG:
+      case TIME:
+      case TIMESTAMP:
+        return (long) value;
+      case STRING:
+        return String.valueOf(value);
+      case FIXED:
+        byte[] fixed = new byte[((Types.FixedType) primitive).length()];
+        Arrays.fill(fixed, (byte) value);
+        return fixed;
+      case BINARY:
+        byte[] binary = new byte[value + 1];
+        Arrays.fill(binary, (byte) value);
+        return binary;
+      case DECIMAL:
+        Types.DecimalType type = (Types.DecimalType) primitive;
+        BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
+        return new BigDecimal(unscaled, type.scale());
+      default:
+        throw new IllegalArgumentException(
+            "Cannot generate random value for unknown type: " + primitive);
+    }
+  }
+
+  private static class DictionaryEncodedDataGenerator extends RandomDataGenerator {
+    private DictionaryEncodedDataGenerator(Schema schema, long seed, float nullPercentage) {
+      super(schema, seed, nullPercentage);
+    }
+
+    @Override
+    protected Object randomValue(Type.PrimitiveType primitive, Random random) {
+      return generateDictionaryEncodablePrimitive(primitive, random);
+    }
+  }
+
+  private static class FallbackDataGenerator extends RandomDataGenerator {
+    private final long dictionaryEncodedRows;
+    private long rowCount = 0;
+
+    private FallbackDataGenerator(Schema schema, long seed, long numDictionaryEncoded) {
+      super(schema, seed, DEFAULT_NULL_PERCENTAGE);
+      this.dictionaryEncodedRows = numDictionaryEncoded;
+    }
+
+    @Override
+    protected Object randomValue(Type.PrimitiveType primitive, Random rand) {
+      this.rowCount += 1;
+      if (rowCount > dictionaryEncodedRows) {
+        return RandomUtil.generatePrimitive(primitive, rand);
+      } else {
+        return generateDictionaryEncodablePrimitive(primitive, rand);
+      }
+    }
+  }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 433f87c..f603757 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -29,13 +29,16 @@ import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
 import java.util.Collection;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.storage.serde2.io.DateWritable;
@@ -53,6 +56,8 @@ import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Assert;
 import scala.collection.Seq;
@@ -78,6 +83,28 @@ public class TestHelpers {
     }
   }
 
+  public static void assertEqualsBatch(Types.StructType struct, Iterator<Record> expected, ColumnarBatch batch,
+                                       boolean checkArrowValidityVector) {
+    for (int rowId = 0; rowId < batch.numRows(); rowId++) {
+      List<Types.NestedField> fields = struct.fields();
+      InternalRow row = batch.getRow(rowId);
+      Record rec = expected.next();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i).type();
+        Object expectedValue = rec.get(i);
+        Object actualValue = row.isNullAt(i) ? null : row.get(i, convert(fieldType));
+        assertEqualsUnsafe(fieldType, expectedValue, actualValue);
+
+        if (checkArrowValidityVector) {
+          ColumnVector columnVector = batch.column(i);
+          ValueVector arrowVector = ((IcebergArrowColumnVector) columnVector).vectorAccessor().getVector();
+          Assert.assertEquals("Nullability doesn't match", expectedValue == null, arrowVector.isNull(rowId));
+        }
+      }
+    }
+  }
+
+
   private static void assertEqualsSafe(Types.ListType list, Collection<?> expected, List actual) {
     Type elementType = list.elementType();
     List<?> expectedElements = Lists.newArrayList(expected);
@@ -199,7 +226,7 @@ public class TestHelpers {
       Type fieldType = fields.get(i).type();
 
       Object expectedValue = rec.get(i);
-      Object actualValue = row.get(i, convert(fieldType));
+      Object actualValue = row.isNullAt(i) ? null : row.get(i, convert(fieldType));
 
       assertEqualsUnsafe(fieldType, expectedValue, actualValue);
     }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
index 1466dea..d7bd696 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
@@ -186,7 +186,7 @@ public class TestParquetAvroReader {
 
   @Test
   public void testCorrectness() throws IOException {
-    Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139);
+    Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 50_000, 34139);
 
     File testFile = temp.newFile();
     Assert.assertTrue("Delete should succeed", testFile.delete());
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
index 0e97c37..dcfc873 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
@@ -74,7 +74,7 @@ public class TestParquetAvroWriter {
 
   @Test
   public void testCorrectness() throws IOException {
-    Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139);
+    Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 50_000, 34139);
 
     File testFile = temp.newFile();
     Assert.assertTrue("Delete should succeed", testFile.delete());
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
index 4ff7844..c75a87a 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
@@ -71,7 +71,7 @@ public class TestSparkParquetWriter {
 
   @Test
   public void testCorrectness() throws IOException {
-    int numRows = 250_000;
+    int numRows = 50_000;
     Iterable<InternalRow> records = RandomData.generateSpark(COMPLEX_SCHEMA, numRows, 19981);
 
     File testFile = temp.newFile();
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
new file mode 100644
index 0000000..7f2d9c3
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.parquet.vectorized;
+
+import java.io.IOException;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.spark.data.RandomData;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVectorizedReads {
+
+  @Override
+  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
+    return RandomData.generateDictionaryEncodableData(schema, numRecords, seed, nullPercentage);
+  }
+
+  @Test
+  @Override
+  @Ignore // Ignored since this code path is already tested in TestParquetVectorizedReads
+  public void testVectorizedReadsWithNewContainers() throws IOException {
+
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
new file mode 100644
index 0000000..ad9d020
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.parquet.vectorized;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.RandomData;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestParquetDictionaryFallbackToPlainEncodingVectorizedReads extends TestParquetVectorizedReads {
+  private static final int NUM_ROWS = 1_000_000;
+
+  @Override
+  protected int getNumRows() {
+    return NUM_ROWS;
+  }
+
+  @Override
+  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
+    //TODO: take into account nullPercentage when generating fallback encoding data
+    return RandomData.generateFallbackData(schema, numRecords, seed, numRecords / 20);
+  }
+
+  @Override
+  FileAppender<GenericData.Record> getParquetWriter(Schema schema, File testFile) throws IOException {
+    return Parquet.write(Files.localOutput(testFile))
+        .schema(schema)
+        .named("test")
+        .set(TableProperties.PARQUET_DICT_SIZE_BYTES, "512000")
+        .build();
+  }
+
+  @Test
+  @Override
+  @Ignore // Fallback encoding not triggered when data is mostly null
+  public void testMostlyNullsForOptionalFields() {
+
+  }
+
+  @Test
+  @Override
+  @Ignore // Ignored since this code path is already tested in TestParquetVectorizedReads
+  public void testVectorizedReadsWithNewContainers() throws IOException {
+
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
new file mode 100644
index 0000000..3e4f5f9
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.parquet.vectorized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.AvroDataTest;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestParquetVectorizedReads extends AvroDataTest {
+  private static final int NUM_ROWS = 200_000;
+
+  @Override
+  protected void writeAndValidate(Schema schema) throws IOException {
+    writeAndValidate(schema, getNumRows(), 0L, RandomData.DEFAULT_NULL_PERCENTAGE, false, true);
+  }
+
+  private void writeAndValidate(
+      Schema schema, int numRecords, long seed, float nullPercentage,
+      boolean setAndCheckArrowValidityVector, boolean reuseContainers)
+      throws IOException {
+    // Write test data
+    Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(
+        schema,
+        type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
+
+    Iterable<GenericData.Record> expected = generateData(schema, numRecords, seed, nullPercentage);
+
+    // write a test parquet file using iceberg writer
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<GenericData.Record> writer = getParquetWriter(schema, testFile)) {
+      writer.addAll(expected);
+    }
+    assertRecordsMatch(schema, numRecords, expected, testFile, setAndCheckArrowValidityVector, reuseContainers);
+  }
+
+  protected int getNumRows() {
+    return NUM_ROWS;
+  }
+
+  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
+    return RandomData.generate(schema, numRecords, seed, nullPercentage);
+  }
+
+  FileAppender<GenericData.Record> getParquetWriter(Schema schema, File testFile) throws IOException {
+    return Parquet.write(Files.localOutput(testFile))
+        .schema(schema)
+        .named("test")
+        .build();
+  }
+
+  private void assertRecordsMatch(
+      Schema schema, int expectedSize, Iterable<GenericData.Record> expected, File testFile,
+      boolean setAndCheckArrowValidityBuffer, boolean reuseContainers)
+      throws IOException {
+    Parquet.ReadBuilder readBuilder = Parquet.read(Files.localInput(testFile))
+        .project(schema)
+        .recordsPerBatch(10000)
+        .createBatchedReaderFunc(type -> VectorizedSparkParquetReaders.buildReader(
+            schema,
+            type,
+            setAndCheckArrowValidityBuffer));
+    if (reuseContainers) {
+      readBuilder.reuseContainers();
+    }
+    try (CloseableIterable<ColumnarBatch> batchReader =
+        readBuilder.build()) {
+      Iterator<GenericData.Record> expectedIter = expected.iterator();
+      Iterator<ColumnarBatch> batches = batchReader.iterator();
+      int numRowsRead = 0;
+      while (batches.hasNext()) {
+        ColumnarBatch batch = batches.next();
+        numRowsRead += batch.numRows();
+        TestHelpers.assertEqualsBatch(schema.asStruct(), expectedIter, batch, setAndCheckArrowValidityBuffer);
+      }
+      Assert.assertEquals(expectedSize, numRowsRead);
+    }
+  }
+
+  @Test
+  @Ignore
+  public void testArray() {
+  }
+
+  @Test
+  @Ignore
+  public void testArrayOfStructs() {
+  }
+
+  @Test
+  @Ignore
+  public void testMap() {
+  }
+
+  @Test
+  @Ignore
+  public void testNumericMapKey() {
+  }
+
+  @Test
+  @Ignore
+  public void testComplexMapKey() {
+  }
+
+  @Test
+  @Ignore
+  public void testMapOfStructs() {
+  }
+
+  @Test
+  @Ignore
+  public void testMixedTypes() {
+  }
+
+  @Test
+  @Override
+  public void testNestedStruct() {
+    AssertHelpers.assertThrows(
+        "Vectorized reads are not supported yet for struct fields",
+        UnsupportedOperationException.class,
+        "Vectorized reads are not supported yet for struct fields",
+        () -> VectorizedSparkParquetReaders.buildReader(
+            TypeUtil.assignIncreasingFreshIds(new Schema(required(
+                1,
+                "struct",
+                SUPPORTED_PRIMITIVES))),
+            new MessageType("struct", new GroupType(Type.Repetition.OPTIONAL, "struct").withId(1)),
+            false));
+  }
+
+  @Test
+  public void testMostlyNullsForOptionalFields() throws IOException {
+    writeAndValidate(
+        TypeUtil.assignIncreasingFreshIds(new Schema(SUPPORTED_PRIMITIVES.fields())),
+        getNumRows(),
+        0L,
+        0.99f,
+        false,
+        true);
+  }
+
+  @Test
+  public void testSettingArrowValidityVector() throws IOException {
+    writeAndValidate(new Schema(
+            Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asOptional)),
+        getNumRows(), 0L, RandomData.DEFAULT_NULL_PERCENTAGE, true, true);
+  }
+
+  @Test
+  public void testVectorizedReadsWithNewContainers() throws IOException {
+    writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(SUPPORTED_PRIMITIVES.fields())),
+        getNumRows(), 0L, RandomData.DEFAULT_NULL_PERCENTAGE, true, false);
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
index fcecb17..f171fdb 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -48,15 +49,15 @@ import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.Files.localOutput;
 
+@RunWith(Parameterized.class)
 public class TestParquetScan extends AvroDataTest {
   private static final Configuration CONF = new Configuration();
 
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
   private static SparkSession spark = null;
 
   @BeforeClass
@@ -71,6 +72,23 @@ public class TestParquetScan extends AvroDataTest {
     currentSpark.stop();
   }
 
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { false },
+        new Object[] { true },
+    };
+  }
+
+  private final boolean vectorized;
+
+  public TestParquetScan(boolean vectorized) {
+    this.vectorized = vectorized;
+  }
+
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
     Assume.assumeTrue("Cannot handle non-string map keys in parquet-avro",
@@ -108,6 +126,7 @@ public class TestParquetScan extends AvroDataTest {
         .build();
 
     table.newAppend().appendFile(file).commit();
+    table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)).commit();
 
     Dataset<Row> df = spark.read()
         .format("iceberg")
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
index 41b0091..8bb9518 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataWriter;
@@ -66,14 +67,20 @@ public class TestSparkReadProjection extends TestReadProjection {
   @Parameterized.Parameters
   public static Object[][] parameters() {
     return new Object[][] {
-        new Object[] { "parquet" },
-        new Object[] { "avro" },
-        new Object[] { "orc" }
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false }
     };
   }
 
-  public TestSparkReadProjection(String format) {
+  private final FileFormat format;
+  private final boolean vectorized;
+
+  public TestSparkReadProjection(String format, boolean vectorized) {
     super(format);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ROOT));
+    this.vectorized = vectorized;
   }
 
   @BeforeClass
@@ -96,9 +103,7 @@ public class TestSparkReadProjection extends TestReadProjection {
     File dataFolder = new File(location, "data");
     Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
 
-    FileFormat fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
-
-    File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString()));
+    File testFile = new File(dataFolder, format.addExtension(UUID.randomUUID().toString()));
 
     Table table = TestTables.create(location, desc, writeSchema, PartitionSpec.unpartitioned());
     try {
@@ -106,7 +111,7 @@ public class TestSparkReadProjection extends TestReadProjection {
       // When tables are created, the column ids are reassigned.
       Schema tableSchema = table.schema();
 
-      switch (fileFormat) {
+      switch (format) {
         case AVRO:
           try (FileAppender<Record> writer = Avro.write(localOutput(testFile))
               .createWriterFunc(DataWriter::create)
@@ -143,6 +148,8 @@ public class TestSparkReadProjection extends TestReadProjection {
 
       table.newAppend().appendFile(file).commit();
 
+      table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)).commit();
+
       // rewrite the read schema for the table's reassigned ids
       Map<Integer, Integer> idMapping = Maps.newHashMap();
       for (int id : allIds(writeSchema)) {