You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/10/07 19:08:37 UTC

[incubator-iceberg] branch vectorized-read updated: Improve vectorized read performance for non-dictionary encoded data (#516)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/vectorized-read by this push:
     new 256a052  Improve vectorized read performance for non-dictionary encoded data (#516)
256a052 is described below

commit 256a0522abc6a88765e3b72577f30162441c4f04
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Mon Oct 7 12:08:32 2019 -0700

    Improve vectorized read performance for non-dictionary encoded data (#516)
    
    * Resolve conflicts
    * New column vector implementation that removes bottle neck of null checks
    * Miscallenaous changes to improve performance
    * Enable benchmark for all types except string
    * Vectorize reading of definition levels.
    Add back string type for benchmarks. Introduce null data in the benchmark,
    As of this commit, performance of iceberg vectorized read path for all data types
    except string is on par with Spark. For string data type, iceberg vectorized is
    ~5-10% slower than spark vectorized path.
    * Add benchmarks for all primitive types
    * Refactoring and cleanup
---
 .../org/apache/iceberg/data/TableScanIterable.java |   1 -
 .../iceberg/parquet/BatchedColumnIterator.java     |  72 ++-
 .../iceberg/parquet/BatchedPageIterator.java       | 394 +++---------
 .../org/apache/iceberg/parquet/BytesReader.java    |  31 +
 .../iceberg/parquet/ColumnarBatchReader.java       |  22 +-
 .../apache/iceberg/parquet/NullabilityHolder.java  |  56 ++
 .../org/apache/iceberg/parquet/ParquetUtil.java    |  62 ++
 .../org/apache/iceberg/parquet/VectorReader.java   | 311 +++++----
 .../iceberg/parquet/VectorizedValuesReader.java    | 713 +++++++++++++++++++++
 .../parquet/arrow/IcebergArrowColumnVector.java    | 521 +++++++++++++++
 .../parquet/arrow/IcebergDecimalArrowVector.java   |  36 ++
 .../parquet/arrow/IcebergVarBinaryArrowVector.java |  51 ++
 .../parquet/arrow/IcebergVarcharArrowVector.java   |  52 ++
 .../spark/source/IcebergSourceBenchmark.java       |   2 +-
 .../source/IcebergSourceFlatDataBenchmark.java     |  29 +-
 .../IcebergSourceFlatParquetDataReadBenchmark.java | 245 -------
 ...IcebergSourceFlatParquetDataWriteBenchmark.java |  16 +-
 .../VectorizedIcebergSourceBenchmark.java          | 221 +++++++
 .../vectorized/VectorizedReadFloatsBenchmark.java} |  52 +-
 ...orizedReadFloatsTwentyPercentNullBenchmark.java |  42 ++
 .../VectorizedReadIntBackedDecimalsBenchmark.java} |  53 +-
 .../VectorizedReadIntegersBenchmark.java}          |  53 +-
 ...izedReadIntegersTwentyPercentNullBenchmark.java |  43 ++
 .../vectorized/VectorizedReadLongsBenchmark.java}  |  50 +-
 ...torizedReadLongsTwentyPercentNullBenchmark.java |  42 ++
 .../VectorizedReadPrimitivesBenchmark.java         |  79 +++
 .../VectorizedReadStringsBenchmark.java}           |  54 +-
 ...rizedReadStringsTwentyPercentNullBenchmark.java |  43 ++
 .../data/vector/VectorizedSparkParquetReaders.java |  18 +-
 .../org/apache/iceberg/spark/source/Reader.java    |   4 +-
 .../apache/iceberg/spark/data/AvroDataTest.java    |  43 +-
 .../org/apache/iceberg/spark/data/TestHelpers.java |   3 +-
 .../iceberg/spark/data/TestParquetAvroReader.java  |  19 -
 .../iceberg/spark/data/TestParquetAvroWriter.java  |  12 -
 .../iceberg/spark/data/TestSparkParquetReader.java |  15 -
 .../data/TestSparkParquetVectorizedReader.java     |   3 +-
 .../iceberg/spark/data/TestSparkParquetWriter.java |  10 -
 versions.lock                                      |  22 +-
 versions.props                                     |   2 +-
 39 files changed, 2530 insertions(+), 967 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 3f6b56f..fca3958 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.avro.DataReader;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expressions;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java
index 85d3d50..0b3bfc2 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg.parquet;
 
 import java.io.IOException;
-
 import org.apache.arrow.vector.FieldVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -80,12 +79,57 @@ public class BatchedColumnIterator {
   /**
    * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
    */
-  public void nextBatchNumericNonDecimal(FieldVector fieldVector, int typeWidth) {
+  public void nextBatchIntegers(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
+    int rowsReadSoFar = 0;
+    while (rowsReadSoFar < batchSize && hasNext()) {
+      advance();
+      int rowsInThisBatch = batchedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
+              rowsReadSoFar, typeWidth, holder);
+      rowsReadSoFar += rowsInThisBatch;
+      this.valuesRead += rowsInThisBatch;
+      fieldVector.setValueCount(rowsReadSoFar);
+    }
+  }
+
+  /**
+   * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+   */
+  public void nextBatchLongs(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
+    int rowsReadSoFar = 0;
+    while (rowsReadSoFar < batchSize && hasNext()) {
+      advance();
+      int rowsInThisBatch = batchedPageIterator.nextBatchLongs(fieldVector, batchSize - rowsReadSoFar,
+          rowsReadSoFar, typeWidth, holder);
+      rowsReadSoFar += rowsInThisBatch;
+      this.valuesRead += rowsInThisBatch;
+      fieldVector.setValueCount(rowsReadSoFar);
+    }
+  }
+
+  /**
+   * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+   */
+  public void nextBatchFloats(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
+    int rowsReadSoFar = 0;
+    while (rowsReadSoFar < batchSize && hasNext()) {
+      advance();
+      int rowsInThisBatch = batchedPageIterator.nextBatchFloats(fieldVector, batchSize - rowsReadSoFar,
+          rowsReadSoFar, typeWidth, holder);
+      rowsReadSoFar += rowsInThisBatch;
+      this.valuesRead += rowsInThisBatch;
+      fieldVector.setValueCount(rowsReadSoFar);
+    }
+  }
+
+  /**
+   * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+   */
+  public void nextBatchDoubles(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchNumericNonDecimal(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, typeWidth);
+      int rowsInThisBatch = batchedPageIterator.nextBatchDoubles(fieldVector, batchSize - rowsReadSoFar,
+          rowsReadSoFar, typeWidth, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
@@ -95,12 +139,12 @@ public class BatchedColumnIterator {
   /**
    * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
    */
-  public void nextBatchIntLongBackedDecimal(FieldVector fieldVector, int typeWidth) {
+  public void nextBatchIntLongBackedDecimal(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = batchedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, typeWidth);
+              rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
@@ -110,12 +154,12 @@ public class BatchedColumnIterator {
   /**
    * Method for reading a batch of decimals backed by fixed length byte array parquet data type.
    */
-  public void nextBatchFixedLengthDecimal(FieldVector fieldVector, int typeWidth) {
+  public void nextBatchFixedLengthDecimal(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = batchedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, typeWidth);
+              rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
@@ -125,12 +169,12 @@ public class BatchedColumnIterator {
   /**
    * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
    */
-  public void nextBatchVarWidthType(FieldVector fieldVector) {
+  public void nextBatchVarWidthType(FieldVector fieldVector, NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = batchedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar);
+              rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
@@ -140,12 +184,12 @@ public class BatchedColumnIterator {
   /**
    * Method for reading batches of fixed width binary type (e.g. BYTE[7]).
    */
-  public void nextBatchFixedWidthBinary(FieldVector fieldVector, int typeWidth) {
+  public void nextBatchFixedWidthBinary(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = batchedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, typeWidth);
+              rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
@@ -155,12 +199,12 @@ public class BatchedColumnIterator {
   /**
    * Method for reading batches of booleans.
    */
-  public void nextBatchBoolean(FieldVector fieldVector) {
+  public void nextBatchBoolean(FieldVector fieldVector, NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = batchedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar);
+              rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java
index fac9937..d65c858 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java
@@ -20,8 +20,10 @@
 package org.apache.iceberg.parquet;
 
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.*;
+import java.io.IOException;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
@@ -39,14 +41,12 @@ import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import static java.lang.String.format;
-import static org.apache.parquet.column.ValuesType.*;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
 
 public class BatchedPageIterator {
     private static final Logger LOG = LoggerFactory.getLogger(BatchedPageIterator.class);
+    private VectorizedValuesReader vectorizedValuesReader;
     private final int batchSize;
 
     public BatchedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
@@ -75,7 +75,6 @@ public class BatchedPageIterator {
     private BytesReader bytesReader = null;
     private ValuesReader valuesReader = null;
 
-
     public void setPage(DataPage page) {
         Preconditions.checkNotNull(page, "Cannot read from null page");
         this.page = page;
@@ -123,104 +122,86 @@ public class BatchedPageIterator {
      * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
      * vector. It appropriately sets the validity buffer in the Arrow vector.
      */
-    public int nextBatchNumericNonDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-                                          final int typeWidth) {
+    public int nextBatchIntegers(final FieldVector vector, final int expectedBatchSize,
+        final int numValsInVector,
+        final int typeWidth, NullabilityHolder holder) {
         final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
         if (actualBatchSize <= 0) {
             return 0;
         }
-        int ordinal = numValsInVector;
-        int valsRead = 0;
-        int numNonNulls = 0;
-        int startWriteValIdx = numValsInVector;
-        int maxDefLevel = desc.getMaxDefinitionLevel();
-        ArrowBuf validityBuffer = vector.getValidityBuffer();
-        ArrowBuf dataBuffer = vector.getDataBuffer();
-        int defLevel = definitionLevels.nextInt();
-        while (valsRead < actualBatchSize) {
-            numNonNulls = 0;
-            while (valsRead < actualBatchSize && defLevel == maxDefLevel) {
-                BitVectorHelper.setValidityBitToOne(validityBuffer, ordinal);
-                numNonNulls++;
-                valsRead++;
-                ordinal++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-            if (numNonNulls > 0) {
-                ByteBuffer buffer = bytesReader.getBuffer(numNonNulls * typeWidth);
-                dataBuffer.setBytes(startWriteValIdx * typeWidth, buffer);
-                startWriteValIdx += numNonNulls;
-            }
+        vectorizedValuesReader.readBatchOfIntegers(vector, numValsInVector, typeWidth, actualBatchSize, holder, bytesReader);
+        triplesRead += actualBatchSize;
+        this.hasNext = triplesRead < triplesCount;
+        return actualBatchSize;
+    }
 
-            while (valsRead < actualBatchSize && defLevel < maxDefLevel) {
-                BitVectorHelper.setValidityBit(validityBuffer, ordinal, 0);
-                valsRead++;
-                startWriteValIdx++;
-                ordinal++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
+    /**
+     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
+     * vector. It appropriately sets the validity buffer in the Arrow vector.
+     */
+    public int nextBatchLongs(final FieldVector vector, final int expectedBatchSize,
+        final int numValsInVector,
+        final int typeWidth, NullabilityHolder holder) {
+        final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
+        if (actualBatchSize <= 0) {
+            return 0;
         }
-        triplesRead += valsRead;
+        vectorizedValuesReader.readBatchOfLongs(vector, numValsInVector, typeWidth, actualBatchSize, holder, bytesReader);
+        triplesRead += actualBatchSize;
         this.hasNext = triplesRead < triplesCount;
         return actualBatchSize;
     }
 
     /**
-     * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
-     * Arrow stores all decimals in 16 bytes. This method provides the necessary padding to the decimals read.
+     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
+     * vector. It appropriately sets the validity buffer in the Arrow vector.
      */
-    public int nextBatchIntLongBackedDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-                                             final int typeWidth) {
+    public int nextBatchFloats(final FieldVector vector, final int expectedBatchSize,
+        final int numValsInVector,
+        final int typeWidth, NullabilityHolder holder) {
         final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
         if (actualBatchSize <= 0) {
             return 0;
         }
-        int ordinal = numValsInVector;
-        int valsRead = 0;
-        int numNonNulls = 0;
-        int startWriteValIdx = numValsInVector;
-        int maxDefLevel = desc.getMaxDefinitionLevel();
-        ArrowBuf validityBuffer = vector.getValidityBuffer();
-        ArrowBuf dataBuffer = vector.getDataBuffer();
-        int defLevel = definitionLevels.nextInt();
-        while (valsRead < actualBatchSize) {
-            numNonNulls = 0;
-            while (valsRead < actualBatchSize && defLevel == maxDefLevel) {
-                BitVectorHelper.setValidityBitToOne(validityBuffer, ordinal);
-                numNonNulls++;
-                valsRead++;
-                ordinal++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
+        vectorizedValuesReader.readBatchOfFloats(vector, numValsInVector, typeWidth, actualBatchSize, holder, bytesReader);
+        triplesRead += actualBatchSize;
+        this.hasNext = triplesRead < triplesCount;
+        return actualBatchSize;
+    }
 
-            for (int i = 0; i < numNonNulls; i++) {
-                try {
-                    byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-                    bytesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-                    dataBuffer.setBytes(startWriteValIdx * DecimalVector.TYPE_WIDTH, byteArray);
-                    startWriteValIdx++;
-                } catch (RuntimeException e) {
-                    throw handleRuntimeException(e);
-                }
-            }
+    /**
+     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
+     * vector. It appropriately sets the validity buffer in the Arrow vector.
+     */
+    public int nextBatchDoubles(final FieldVector vector, final int expectedBatchSize,
+        final int numValsInVector,
+        final int typeWidth, NullabilityHolder holder) {
+        final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
+        if (actualBatchSize <= 0) {
+            return 0;
+        }
+        vectorizedValuesReader.readBatchOfDoubles(vector, numValsInVector, typeWidth, actualBatchSize, holder, bytesReader);
+        triplesRead += actualBatchSize;
+        this.hasNext = triplesRead < triplesCount;
+        return actualBatchSize;
+    }
 
-            while (valsRead < actualBatchSize && defLevel < maxDefLevel) {
-                BitVectorHelper.setValidityBit(validityBuffer, ordinal, 0);
-                valsRead++;
-                startWriteValIdx++;
-                ordinal++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
+    /**
+     * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
+     * Arrow stores all decimals in 16 bytes. This method provides the necessary padding to the decimals read.
+     */
+    public int nextBatchIntLongBackedDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+                                             final int typeWidth, NullabilityHolder nullabilityHolder) {
+        final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
+        if (actualBatchSize <= 0) {
+            return 0;
         }
-        triplesRead += valsRead;
+        vectorizedValuesReader.readBatchOfIntLongBackedDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder,
+            bytesReader);
+        triplesRead += actualBatchSize;
         this.hasNext = triplesRead < triplesCount;
         return actualBatchSize;
     }
@@ -233,60 +214,13 @@ public class BatchedPageIterator {
      * Arrow vector is indeed little endian.
      */
     public int nextBatchFixedLengthDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-                                           final int typeWidth) {
+                                           final int typeWidth, NullabilityHolder nullabilityHolder) {
         final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
         if (actualBatchSize <= 0) {
             return 0;
         }
-        int ordinal = numValsInVector;
-        int valsRead = 0;
-        int numNonNulls = 0;
-        int numNulls = 0;
-        int maxDefLevel = desc.getMaxDefinitionLevel();
-        int defLevel = definitionLevels.nextInt();
-        while (valsRead < actualBatchSize) {
-            numNonNulls = 0;
-            numNulls = 0;
-            while (valsRead < actualBatchSize && defLevel == maxDefLevel) {
-                numNonNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-
-            for (int i = 0; i < numNonNulls; i++) {
-                try {
-                    byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-                    //bytesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-                    bytesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-                   ((DecimalVector) vector).setBigEndian(ordinal, byteArray);
-                    ordinal++;
-                } catch (RuntimeException e) {
-                    throw handleRuntimeException(e);
-                }
-            }
-
-
-            while (valsRead < actualBatchSize && defLevel < maxDefLevel) {
-                numNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-            if (numNulls > 0) {
-                for (int i = 0; i < numNulls; i++) {
-                    try {
-                        ((DecimalVector) vector).setNull(ordinal);
-                        ordinal++;
-                    } catch (RuntimeException e) {
-                        throw handleRuntimeException(e);
-                    }
-                }
-            }
-        }
-        triplesRead += valsRead;
+        vectorizedValuesReader.readBatchOfFixedLengthDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, bytesReader);
+        triplesRead += actualBatchSize;
         this.hasNext = triplesRead < triplesCount;
         return actualBatchSize;
     }
@@ -294,57 +228,14 @@ public class BatchedPageIterator {
     /**
      * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
      */
-    public int nextBatchVarWidthType(final FieldVector vector, final int expectedBatchSize, final int numValsInVector) {
+    public int nextBatchVarWidthType(final FieldVector vector, final int expectedBatchSize, final int numValsInVector
+        , NullabilityHolder nullabilityHolder) {
         final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
         if (actualBatchSize <= 0) {
             return 0;
         }
-        int ordinal = numValsInVector;
-        int valsRead = 0;
-        int numNonNulls = 0;
-        int numNulls = 0;
-        int maxDefLevel = desc.getMaxDefinitionLevel();
-        int defLevel = definitionLevels.nextInt();
-        while (valsRead < actualBatchSize) {
-            numNonNulls = 0;
-            numNulls = 0;
-            while (valsRead < actualBatchSize && defLevel == maxDefLevel) {
-                numNonNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-
-            for (int i = 0; i < numNonNulls; i++) {
-                try {
-                    ((BaseVariableWidthVector) vector).setSafe(ordinal, valuesReader.readBytes().getBytesUnsafe());
-                    ordinal++;
-                } catch (RuntimeException e) {
-                    throw handleRuntimeException(e);
-                }
-            }
-
-
-            while (valsRead < actualBatchSize && defLevel < maxDefLevel) {
-                numNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-            if (numNulls > 0) {
-                for (int i = 0; i < numNulls; i++) {
-                    try {
-                        ((BaseVariableWidthVector) vector).setNull(ordinal);
-                        ordinal++;
-                    } catch (RuntimeException e) {
-                        throw handleRuntimeException(e);
-                    }
-                }
-            }
-        }
-        triplesRead += valsRead;
+        vectorizedValuesReader.readBatchVarWidth(vector, numValsInVector, actualBatchSize, nullabilityHolder, bytesReader);
+        triplesRead += actualBatchSize;
         this.hasNext = triplesRead < triplesCount;
         return actualBatchSize;
     }
@@ -355,60 +246,13 @@ public class BatchedPageIterator {
      * fixed width binary from parquet and stored in a {@link VarBinaryVector} in Arrow.
      */
     public int nextBatchFixedWidthBinary(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-                                         final int typeWidth) {
+                                         final int typeWidth, NullabilityHolder nullabilityHolder) {
         final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
         if (actualBatchSize <= 0) {
             return 0;
         }
-        int ordinal = numValsInVector;
-        int valsRead = 0;
-        int numNonNulls = 0;
-        int numNulls = 0;
-        int maxDefLevel = desc.getMaxDefinitionLevel();
-        int defLevel = definitionLevels.nextInt();
-        while (valsRead < actualBatchSize) {
-            numNonNulls = 0;
-            numNulls = 0;
-            while (valsRead < actualBatchSize && defLevel == maxDefLevel) {
-                numNonNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-
-            for (int i = 0; i < numNonNulls; i++) {
-                try {
-                    byte[] byteArray = new byte[typeWidth];
-                    bytesReader.getBuffer(typeWidth).get(byteArray);
-                    ((VarBinaryVector) vector).setSafe(ordinal, byteArray);
-                    ordinal++;
-                } catch (RuntimeException e) {
-                    System.out.println("Ordinal: " + ordinal);
-                    throw handleRuntimeException(e);
-                }
-            }
-
-
-            while (valsRead < actualBatchSize && defLevel < maxDefLevel) {
-                numNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-            if (numNulls > 0) {
-                for (int i = 0; i < numNulls; i++) {
-                    try {
-                        ((VarBinaryVector) vector).setNull(ordinal);
-                        ordinal++;
-                    } catch (RuntimeException e) {
-                        throw handleRuntimeException(e);
-                    }
-                }
-            }
-        }
-        triplesRead += valsRead;
+        vectorizedValuesReader.readBatchOfFixedWidthBinary(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, bytesReader);
+        triplesRead += actualBatchSize;
         this.hasNext = triplesRead < triplesCount;
         return actualBatchSize;
     }
@@ -416,57 +260,14 @@ public class BatchedPageIterator {
     /**
      * Method for reading batches of booleans.
      */
-    public int nextBatchBoolean(final FieldVector vector, final int expectedBatchSize, final int numValsInVector) {
+    public int nextBatchBoolean(final FieldVector vector, final int expectedBatchSize, final int numValsInVector, NullabilityHolder nullabilityHolder) {
         final int actualBatchSize = Math.min(expectedBatchSize, triplesCount - triplesRead);
         if (actualBatchSize <= 0) {
             return 0;
         }
-        int ordinal = numValsInVector;
-        int valsRead = 0;
-        int numNonNulls = 0;
-        int numNulls = 0;
-        int maxDefLevel = desc.getMaxDefinitionLevel();
-        int defLevel = definitionLevels.nextInt();
-        while (valsRead < actualBatchSize) {
-            numNonNulls = 0;
-            numNulls = 0;
-            while (valsRead < actualBatchSize && defLevel == maxDefLevel) {
-                numNonNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-
-            for (int i = 0; i < numNonNulls; i++) {
-                try {
-                    ((BitVector) vector).setSafe(ordinal, ((valuesReader.readBoolean() == false) ? 0 : 1));
-                    ordinal++;
-                } catch (RuntimeException e) {
-                    throw handleRuntimeException(e);
-                }
-            }
-
-
-            while (valsRead < actualBatchSize && defLevel < maxDefLevel) {
-                numNulls++;
-                valsRead++;
-                if (valsRead < actualBatchSize) {
-                    defLevel = definitionLevels.nextInt();
-                }
-            }
-            if (numNulls > 0) {
-                for (int i = 0; i < numNulls; i++) {
-                    try {
-                        ((BitVector) vector).setNull(ordinal);
-                        ordinal++;
-                    } catch (RuntimeException e) {
-                        throw handleRuntimeException(e);
-                    }
-                }
-            }
-        }
-        triplesRead += valsRead;
+        vectorizedValuesReader.readBatchOfBooleans(vector, numValsInVector, actualBatchSize,
+            nullabilityHolder, bytesReader);
+        triplesRead += actualBatchSize;
         this.hasNext = triplesRead < triplesCount;
         return actualBatchSize;
     }
@@ -514,17 +315,18 @@ public class BatchedPageIterator {
             }
             this.bytesReader = dataEncoding.getDictionaryBasedValuesReader(desc, VALUES, dict); */
         } else {
-            if (ParquetUtil.isVarWidthType(desc) || ParquetUtil.isBooleanType(desc)) {
-                this.valuesReader = dataEncoding.getValuesReader(desc, VALUES);
-                try {
-                    valuesReader.initFromPage(valueCount, in);
-                } catch (IOException e) {
-                    throw new ParquetDecodingException("could not read page in col " + desc, e);
-                }
-            } else {
+            //if (ParquetUtil.isVarWidthType(desc) || ParquetUtil.isBooleanType(desc)) {
+            // if (ParquetUtil.isBooleanType(desc)) {
+            //     this.valuesReader = dataEncoding.getValuesReader(desc, VALUES);
+            //     try {
+            //         valuesReader.initFromPage(valueCount, in);
+            //     } catch (IOException e) {
+            //         throw new ParquetDecodingException("could not read page in col " + desc, e);
+            //     }
+            // } else {
                 this.bytesReader = new BytesReader();
                 bytesReader.initFromPage(valueCount, in);
-            }
+            //}
         }
 
         //    if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
@@ -543,18 +345,16 @@ public class BatchedPageIterator {
     private void initFromPage(DataPageV1 page) {
         this.triplesCount = page.getValueCount();
         ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
-        ValuesReader dlReader = page.getDlEncoding().getValuesReader(desc, DEFINITION_LEVEL);
+        int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+        ValuesReader dlReader = this.vectorizedValuesReader = new VectorizedValuesReader(
+            bitWidth, desc.getMaxDefinitionLevel());
         this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
         this.definitionLevels = new ValuesReaderIntIterator(dlReader);
         try {
             BytesInput bytes = page.getBytes();
-            LOG.debug("page size {} bytes and {} records", bytes.size(), triplesCount);
-            LOG.debug("reading repetition levels at 0");
             ByteBufferInputStream in = bytes.toInputStream();
             rlReader.initFromPage(triplesCount, in);
-            LOG.debug("reading definition levels at {}", in.position());
             dlReader.initFromPage(triplesCount, in);
-            LOG.debug("reading data at {}", in.position());
             initDataReader(page.getValueEncoding(), in, page.getValueCount());
         } catch (IOException e) {
             throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
@@ -568,6 +368,11 @@ public class BatchedPageIterator {
         LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
         try {
             initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
+            int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+            this.vectorizedValuesReader = new VectorizedValuesReader(bitWidth, false,
+                    desc.getMaxDefinitionLevel());
+            vectorizedValuesReader.initFromPage(triplesCount, page.getDefinitionLevels().toInputStream());
+
         } catch (IOException e) {
             throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
         }
@@ -628,5 +433,4 @@ public class BatchedPageIterator {
             return 0;
         }
     }
-
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java
index 5df8a3c..6bc4756 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java
@@ -31,6 +31,9 @@ import org.apache.parquet.io.ParquetDecodingException;
  */
 public class BytesReader extends ValuesReader {
   private ByteBufferInputStream in = null;
+  // Only used for booleans.
+  private int bitOffset;
+  private byte currentByte = 0;
 
   public BytesReader() {
   }
@@ -52,5 +55,33 @@ public class BytesReader extends ValuesReader {
       throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
     }
   }
+
+  @Override
+  public final int readInteger() {
+    return getBuffer(4).getInt();
+  }
+
+  @Override
+  public final boolean readBoolean() {
+    if (bitOffset == 0) {
+      currentByte = getByte();
+    }
+
+    boolean v = (currentByte & (1 << bitOffset)) != 0;
+    bitOffset += 1;
+    if (bitOffset == 8) {
+      bitOffset = 0;
+    }
+    return v;
+  }
+
+  private byte getByte() {
+    try {
+      return (byte) in.read();
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read a byte", e);
+    }
+  }
+
 }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java
index 5363db8..36be507 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java
@@ -3,11 +3,10 @@ package org.apache.iceberg.parquet;
 import java.lang.reflect.Array;
 import java.util.List;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Types;
-import org.apache.parquet.Preconditions;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.schema.Type;
-import org.apache.spark.sql.vectorized.ArrowColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,23 +36,22 @@ public class ColumnarBatchReader implements BatchedReader{
 
     public final ColumnarBatch read(ColumnarBatch ignore) {
 
-        ArrowColumnVector[] arrowVectorArr = (ArrowColumnVector[]) Array.newInstance(ArrowColumnVector.class,
-                readers.length);
+        IcebergArrowColumnVector[] icebergArrowColumnVectors = (IcebergArrowColumnVector[]) Array.newInstance(IcebergArrowColumnVector.class,
+            readers.length);
 
         int numRows = 0;
         for (int i = 0; i < readers.length; i += 1) {
-
-            FieldVector vec = readers[i].read();
-            arrowVectorArr[i] = new ArrowColumnVector(vec);
-            if (i > 0) {
-                Preconditions.checkState(numRows == vec.getValueCount(),
-                    "Different number of values returned by readers for columns: " +
-                        readers[i - 1] + " and " + readers[i]);
+            NullabilityHolder nullabilityHolder = new NullabilityHolder(readers[i].batchSize());
+            FieldVector vec = readers[i].read(nullabilityHolder);
+            icebergArrowColumnVectors[i] = new IcebergArrowColumnVector(vec, nullabilityHolder);
+            if (i > 0 && numRows != vec.getValueCount()) {
+                throw new IllegalStateException("Different number of values returned by readers" +
+                    "for columns " + readers[i - 1] + " and " + readers[i]);
             }
             numRows = vec.getValueCount();
         }
 
-        ColumnarBatch batch = new ColumnarBatch(arrowVectorArr);
+        ColumnarBatch batch = new ColumnarBatch(icebergArrowColumnVectors);
         batch.setNumRows(numRows);
 
         return batch;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
new file mode 100644
index 0000000..dbffdb0
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+public class NullabilityHolder {
+  private final boolean[] isNull;
+  private int numNulls;
+
+  public NullabilityHolder(int batchSize) {
+    this.isNull = new boolean[batchSize];
+  }
+
+
+  public void setNull(int idx) {
+    isNull[idx] =  true;
+    numNulls++;
+  }
+
+  public void setNulls(int idx, int num) {
+    int i = 0;
+    while (i < num) {
+      isNull[idx] = true;
+      numNulls++;
+      idx++;
+      i++;
+    }
+  }
+
+  public boolean isNullAt(int idx) {
+    return isNull[idx];
+  }
+
+  public boolean hasNulls() {
+    return numNulls > 0;
+  }
+
+  public int numNulls() {
+    return numNulls;
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 7267259..f3cbb42 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -30,11 +30,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.IntVector;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.MetricsModes;
 import org.apache.iceberg.MetricsModes.MetricsMode;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.ArrowSchemaUtil;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.io.InputFile;
@@ -284,4 +288,62 @@ public class ParquetUtil {
     return false;
   }
 
+  public static boolean isNumericNonDecimalType(ColumnDescriptor desc) {
+    PrimitiveType primitive = desc.getPrimitiveType();
+    OriginalType originalType = primitive.getOriginalType();
+    if (originalType != null) {
+      if (originalType == INT_8 || originalType == INT_16 || originalType == INT_32
+          || originalType == DATE || originalType == INT_64 || originalType == TIMESTAMP_MILLIS
+          || originalType == TIMESTAMP_MICROS) {
+        return true;
+      }
+    } else {
+      PrimitiveType.PrimitiveTypeName primitiveTypeName = primitive.getPrimitiveTypeName();
+      if (primitiveTypeName == INT64 || primitiveTypeName == INT32 || primitiveTypeName == FLOAT ||
+          primitiveTypeName == DOUBLE) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static boolean isIntType(ColumnDescriptor desc) {
+    PrimitiveType primitive = desc.getPrimitiveType();
+    OriginalType originalType = primitive.getOriginalType();
+    if (originalType != null && (originalType ==  INT_8 || originalType == INT_16 || originalType == INT_32 || originalType == DATE)) {
+      return true;
+    } else if (primitive.getPrimitiveTypeName() == INT32) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isLongType(ColumnDescriptor desc) {
+    PrimitiveType primitive = desc.getPrimitiveType();
+    OriginalType originalType = primitive.getOriginalType();
+    if (originalType != null && (originalType ==  INT_64 || originalType == TIMESTAMP_MILLIS || originalType == TIMESTAMP_MICROS)) {
+      return true;
+    } else if (primitive.getPrimitiveTypeName() == INT64) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isDoubleType(ColumnDescriptor desc) {
+    PrimitiveType primitive = desc.getPrimitiveType();
+    OriginalType originalType = primitive.getOriginalType();
+    if (originalType == null && primitive.getPrimitiveTypeName() == DOUBLE) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isFloatType(ColumnDescriptor desc) {
+    PrimitiveType primitive = desc.getPrimitiveType();
+    OriginalType originalType = primitive.getOriginalType();
+    if (originalType == null && primitive.getPrimitiveTypeName() == FLOAT) {
+      return true;
+    }
+    return false;
+  }
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java
index 0915e1d..f34d574 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java
@@ -22,6 +22,9 @@ package org.apache.iceberg.parquet;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.*;
 import org.apache.iceberg.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergDecimalArrowVector;
+import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarBinaryArrowVector;
+import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarcharArrowVector;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
@@ -44,154 +47,188 @@ import org.apache.parquet.schema.PrimitiveType;
  *   icebergField : DECIMAL   -   Field Vector Type : org.apache.arrow.vector.DecimalVector
  */
 public class VectorReader implements BatchedReader {
-    public static final int DEFAULT_NUM_ROWS_IN_BATCH = 10000;
-    public static final int UNKNOWN_WIDTH = -1;
+  public static final int DEFAULT_BATCH_SIZE = 5000;
+  public static final int UNKNOWN_WIDTH = -1;
 
-    private final ColumnDescriptor columnDescriptor;
-    private FieldVector vec;
-    private final int rowsInBatch;
-    private final BatchedColumnIterator batchedColumnIterator;
-    private final int typeWidth;
-    private final boolean isFixedLengthDecimal;
-    private final boolean isVarWidthType;
-    private final boolean isFixedWidthBinary;
-    private final boolean isBooleanType;
-    private final boolean isPaddedDecimal;
+  private final ColumnDescriptor columnDescriptor;
+  private FieldVector vec;
+  private final int batchSize;
+  private final BatchedColumnIterator batchedColumnIterator;
+  private final int typeWidth;
+  private final boolean isFixedLengthDecimal;
+  private final boolean isVarWidthType;
+  private final boolean isFixedWidthBinary;
+  private final boolean isBooleanType;
+  private final boolean isPaddedDecimal;
+  private final boolean isIntType;
+  private final boolean isLongType;
+  private final boolean isFloatType;
+  private final boolean isDoubleType;
 
+  // This value is copied from Arrow's BaseVariableWidthVector. We may need to change
+  // this value if Arrow ends up changing this default.
+  private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
 
-    public VectorReader(ColumnDescriptor desc,
-                        Types.NestedField icebergField,
-                        BufferAllocator rootAlloc,
-                        int rowsInBatch) {
-        this.rowsInBatch = (rowsInBatch == 0) ? DEFAULT_NUM_ROWS_IN_BATCH : rowsInBatch;
-        this.columnDescriptor = desc;
-        this.typeWidth = allocateFieldVector(rootAlloc, icebergField, desc);
+  public VectorReader(
+      ColumnDescriptor desc,
+      Types.NestedField icebergField,
+      BufferAllocator rootAlloc,
+      int batchSize) {
+    this.batchSize = (batchSize == 0) ? DEFAULT_BATCH_SIZE : batchSize;
+    this.columnDescriptor = desc;
+    this.typeWidth = allocateFieldVector(rootAlloc, icebergField, desc);
+    this.isFixedLengthDecimal = ParquetUtil.isFixedLengthDecimal(desc);
+    this.isVarWidthType = ParquetUtil.isVarWidthType(desc);
+    this.isFixedWidthBinary = ParquetUtil.isFixedWidthBinary(desc);
+    this.isBooleanType = ParquetUtil.isBooleanType(desc);
+    this.isPaddedDecimal = ParquetUtil.isIntLongBackedDecimal(desc);
+    this.isIntType = ParquetUtil.isIntType(desc);
+    this.isLongType = ParquetUtil.isLongType(desc);
+    this.isFloatType = ParquetUtil.isFloatType(desc);
+    this.isDoubleType = ParquetUtil.isDoubleType(desc);
+    this.batchedColumnIterator = new BatchedColumnIterator(desc, "", batchSize);
+    //this.nullabilityHolder = new NullabilityHolder(this.batchSize);
+  }
 
-        isFixedLengthDecimal = ParquetUtil.isFixedLengthDecimal(desc);
-        isVarWidthType = ParquetUtil.isVarWidthType(desc);
-        isFixedWidthBinary = ParquetUtil.isFixedWidthBinary(desc);
-        isBooleanType = ParquetUtil.isBooleanType(desc);
-        isPaddedDecimal = ParquetUtil.isIntLongBackedDecimal(desc);
+  private int allocateFieldVector(BufferAllocator rootAlloc, Types.NestedField icebergField, ColumnDescriptor desc) {
 
-        this.batchedColumnIterator = new BatchedColumnIterator(desc, "", rowsInBatch);
+    PrimitiveType primitive = desc.getPrimitiveType();
+    if (primitive.getOriginalType() != null) {
+      switch (desc.getPrimitiveType().getOriginalType()) {
+        case ENUM:
+        case JSON:
+        case UTF8:
+        case BSON:
+          this.vec = new IcebergVarcharArrowVector(icebergField.name(), rootAlloc);
+          vec.setInitialCapacity(batchSize * 10);
+          //TODO: samarth use the uncompressed page size info here
+          vec.allocateNewSafe();
+          return UNKNOWN_WIDTH;
+        case INT_8:
+        case INT_16:
+        case INT_32:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((IntVector) vec).allocateNew(batchSize);
+          return IntVector.TYPE_WIDTH;
+        case DATE:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((DateDayVector) vec).allocateNew(batchSize);
+          return IntVector.TYPE_WIDTH;
+        case INT_64:
+        case TIMESTAMP_MILLIS:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((BigIntVector) vec).allocateNew(batchSize);
+          return BigIntVector.TYPE_WIDTH;
+        case TIMESTAMP_MICROS:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((TimeStampMicroTZVector) vec).allocateNew(batchSize);
+          return BigIntVector.TYPE_WIDTH;
+        case DECIMAL:
+          DecimalMetadata decimal = primitive.getDecimalMetadata();
+          this.vec = new IcebergDecimalArrowVector(icebergField.name(), rootAlloc, decimal.getPrecision(),
+              decimal.getScale());
+          ((DecimalVector) vec).allocateNew(batchSize);
+          switch (primitive.getPrimitiveTypeName()) {
+            case BINARY:
+            case FIXED_LEN_BYTE_ARRAY:
+              return primitive.getTypeLength();
+            case INT64:
+              return BigIntVector.TYPE_WIDTH;
+            case INT32:
+              return IntVector.TYPE_WIDTH;
+            default:
+              throw new UnsupportedOperationException(
+                  "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+          }
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported logical type: " + primitive.getOriginalType());
+      }
+    } else {
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+          int len = ((Types.FixedType) icebergField.type()).length();
+          this.vec = new IcebergVarBinaryArrowVector(icebergField.name(), rootAlloc);
+          int factor = (len + DEFAULT_RECORD_BYTE_COUNT - 1) / (DEFAULT_RECORD_BYTE_COUNT);
+          vec.setInitialCapacity(batchSize * factor);
+          vec.allocateNew();
+          return len;
+        case BINARY:
+          this.vec = new IcebergVarBinaryArrowVector(icebergField.name(), rootAlloc);
+          vec.setInitialCapacity(batchSize * 10);
+          //TODO: samarth use the uncompressed page size info here
+          vec.allocateNewSafe();
+          return UNKNOWN_WIDTH;
+        case INT32:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((IntVector) vec).allocateNew(batchSize);
+          return IntVector.TYPE_WIDTH;
+        case FLOAT:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((Float4Vector) vec).allocateNew(batchSize);
+          return Float4Vector.TYPE_WIDTH;
+        case BOOLEAN:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((BitVector) vec).allocateNew(batchSize);
+          return UNKNOWN_WIDTH;
+        case INT64:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((BigIntVector) vec).allocateNew(batchSize);
+          return BigIntVector.TYPE_WIDTH;
+        case DOUBLE:
+          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          ((Float8Vector) vec).allocateNew(batchSize);
+          return Float8Vector.TYPE_WIDTH;
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + primitive);
+      }
     }
+  }
 
-    private int allocateFieldVector(BufferAllocator rootAlloc, Types.NestedField icebergField, ColumnDescriptor desc) {
-
-        PrimitiveType primitive = desc.getPrimitiveType();
-        if (primitive.getOriginalType() != null) {
-            switch (desc.getPrimitiveType().getOriginalType()) {
-                case ENUM:
-                case JSON:
-                case UTF8:
-                case BSON:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    return UNKNOWN_WIDTH;
-                case INT_8:
-                case INT_16:
-                case INT_32:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((IntVector) vec).allocateNew(rowsInBatch * IntVector.TYPE_WIDTH);
-                    return IntVector.TYPE_WIDTH;
-                case DATE:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((DateDayVector) vec).allocateNew(rowsInBatch * IntVector.TYPE_WIDTH);
-                    return IntVector.TYPE_WIDTH;
-                case INT_64:
-                case TIMESTAMP_MILLIS:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((BigIntVector) vec).allocateNew(rowsInBatch * BigIntVector.TYPE_WIDTH);
-                    return BigIntVector.TYPE_WIDTH;
-                case TIMESTAMP_MICROS:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((TimeStampMicroTZVector) vec).allocateNew(rowsInBatch * BigIntVector.TYPE_WIDTH);
-                    return BigIntVector.TYPE_WIDTH;
-                case DECIMAL:
-                    DecimalMetadata decimal = primitive.getDecimalMetadata();
-                    this.vec = new DecimalVector(icebergField.name(), rootAlloc, decimal.getPrecision(), decimal.getScale());
-                    ((DecimalVector) vec).allocateNew(rowsInBatch * DecimalVector.TYPE_WIDTH);
-                    switch (primitive.getPrimitiveTypeName()) {
-                        case BINARY:
-                        case FIXED_LEN_BYTE_ARRAY:
-                            return primitive.getTypeLength();
-                        case INT64:
-                            return BigIntVector.TYPE_WIDTH;
-                        case INT32:
-                            return IntVector.TYPE_WIDTH;
-                        default:
-                            throw new UnsupportedOperationException(
-                                    "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
-                    }
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unsupported logical type: " + primitive.getOriginalType());
-            }
-        } else {
-            switch (primitive.getPrimitiveTypeName()) {
-                case FIXED_LEN_BYTE_ARRAY:
-                   this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    int len = ((Types.FixedType) icebergField.type()).length();
-                    vec.setInitialCapacity(rowsInBatch * len);
-                    vec.allocateNew();
-                    return len;
-                case BINARY:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    vec.setInitialCapacity(rowsInBatch);
-                    vec.allocateNew();
-                    return UNKNOWN_WIDTH;
-                case INT32:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((IntVector) vec).allocateNew(rowsInBatch * IntVector.TYPE_WIDTH);
-                    return IntVector.TYPE_WIDTH;
-                case FLOAT:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((Float4Vector) vec).allocateNew(rowsInBatch * Float4Vector.TYPE_WIDTH);
-                    return Float4Vector.TYPE_WIDTH;
-                case BOOLEAN:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((BitVector) vec).allocateNew(rowsInBatch);
-                    return UNKNOWN_WIDTH;
-                case INT64:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((BigIntVector) vec).allocateNew(rowsInBatch * BigIntVector.TYPE_WIDTH);
-                    return BigIntVector.TYPE_WIDTH;
-                case DOUBLE:
-                    this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-                    ((Float8Vector) vec).allocateNew(rowsInBatch * Float8Vector.TYPE_WIDTH);
-                    return Float8Vector.TYPE_WIDTH;
-                default:
-                    throw new UnsupportedOperationException("Unsupported type: " + primitive);
-            }
+  public FieldVector read(NullabilityHolder nullabilityHolder) {
+    vec.setValueCount(0);
+    if (batchedColumnIterator.hasNext()) {
+      if (isFixedLengthDecimal) {
+        batchedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
+        ((IcebergDecimalArrowVector) vec).setNullabilityHolder(nullabilityHolder);
+      } else if (isFixedWidthBinary) {
+        batchedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth, nullabilityHolder);
+      } else if (isVarWidthType) {
+        if (vec instanceof IcebergVarcharArrowVector) {
+          ((IcebergVarcharArrowVector) vec).setNullabilityHolder(nullabilityHolder);
+        } else if (vec instanceof IcebergVarBinaryArrowVector) {
+          ((IcebergVarBinaryArrowVector) vec).setNullabilityHolder(nullabilityHolder);
         }
+        batchedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
+      } else if (isBooleanType) {
+        batchedColumnIterator.nextBatchBoolean(vec, nullabilityHolder);
+      } else if (isPaddedDecimal) {
+        ((IcebergDecimalArrowVector) vec).setNullabilityHolder(nullabilityHolder);
+        batchedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
+      } else if (isIntType) {
+        batchedColumnIterator.nextBatchIntegers(vec, typeWidth, nullabilityHolder);
+      } else if (isLongType) {
+        batchedColumnIterator.nextBatchLongs(vec, typeWidth, nullabilityHolder);
+      } else if (isFloatType) {
+        batchedColumnIterator.nextBatchFloats(vec, typeWidth, nullabilityHolder);
+      } else if (isDoubleType) {
+        batchedColumnIterator.nextBatchDoubles(vec, typeWidth, nullabilityHolder);
+      }
     }
+    return vec;
+  }
 
-    public FieldVector read() {
-        if (batchedColumnIterator.hasNext()) {
-            if (isFixedLengthDecimal) {
-                batchedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth);
-            } else if (isVarWidthType) {
-                batchedColumnIterator.nextBatchVarWidthType(vec);
-            } else if (isFixedWidthBinary) {
-                vec.reset();
-                batchedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth);
-            } else if (isBooleanType) {
-                batchedColumnIterator.nextBatchBoolean(vec);
-            } else if (isPaddedDecimal) {
-                batchedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth);
-            } else {
-                batchedColumnIterator.nextBatchNumericNonDecimal(vec, typeWidth);
-            }
-        }
-        return vec;
-    }
+  public void setPageSource(PageReadStore source) {
+    batchedColumnIterator.setPageSource(source);
+  }
 
-    public void setPageSource(PageReadStore source) {
-        batchedColumnIterator.setPageSource(source);
-    }
+  @Override
+  public String toString() {
+    return columnDescriptor.toString();
+  }
 
-    @Override
-    public String toString() {
-        return columnDescriptor.toString();
-    }
+  public int batchSize() {
+    return batchSize;
+  }
 }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java
new file mode 100644
index 0000000..7bcd431
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import io.netty.buffer.ArrowBuf;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * A values reader for Parquet's run-length encoded data. This is based off of the version in
+ * parquet-mr with these changes:
+ * - Supports the vectorized interface.
+ * - Works on byte arrays(byte[]) instead of making byte streams.
+ * <p>
+ * This encoding is used in multiple places:
+ * - Definition/Repetition levels
+ * - Dictionary ids.
+ */
+public final class VectorizedValuesReader extends ValuesReader {
+  // Current decoding mode. The encoded data contains groups of either run length encoded data
+  // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+  // the number of values in the group.
+  // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
+  private enum MODE {
+    RLE,
+    PACKED
+  }
+
+  // Encoded data.
+  private ByteBufferInputStream in;
+
+  // bit/byte width of decoded data and utility to batch unpack them.
+  private int bitWidth;
+  private int bytesWidth;
+  private BytePacker packer;
+
+  // Current decoding mode and values
+  private MODE mode;
+  private int currentCount;
+  private int currentValue;
+
+  // Buffer of decoded values if the values are PACKED.
+  private int[] currentBuffer = new int[16];
+  private int currentBufferIdx = 0;
+
+  // If true, the bit width is fixed. This decoder is used in different places and this also
+  // controls if we need to read the bitwidth from the beginning of the data stream.
+  private final boolean fixedWidth;
+  private final boolean readLength;
+  private final int maxDefLevel;
+
+  public VectorizedValuesReader(
+      int bitWidth,
+      int maxDefLevel) {
+    this.fixedWidth = true;
+    this.readLength = bitWidth != 0;
+    this.maxDefLevel = maxDefLevel;
+    init(bitWidth);
+  }
+
+  public VectorizedValuesReader(
+      int bitWidth,
+      boolean readLength,
+      int maxDefLevel) {
+    this.fixedWidth = true;
+    this.readLength = readLength;
+    this.maxDefLevel = maxDefLevel;
+    init(bitWidth);
+  }
+
+  @Override
+  public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+    this.in = in;
+    if (fixedWidth) {
+      // initialize for repetition and definition levels
+      if (readLength) {
+        int length = readIntLittleEndian();
+        this.in = in.sliceStream(length);
+      }
+    } else {
+      // initialize for values
+      if (in.available() > 0) {
+        init(in.read());
+      }
+    }
+    if (bitWidth == 0) {
+      // 0 bit width, treat this as an RLE run of valueCount number of 0's.
+      this.mode = MODE.RLE;
+      this.currentCount = valueCount;
+      this.currentValue = 0;
+    } else {
+      this.currentCount = 0;
+    }
+  }
+
+  /**
+   * Initializes the internal state for decoding ints of `bitWidth`.
+   */
+  private void init(int bitWidth) {
+    Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+    this.bitWidth = bitWidth;
+    this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
+    this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+  }
+
+  /**
+   * Reads the next varint encoded int.
+   */
+  private int readUnsignedVarInt() throws IOException {
+    int value = 0;
+    int shift = 0;
+    int b;
+    do {
+      b = in.read();
+      value |= (b & 0x7F) << shift;
+      shift += 7;
+    } while ((b & 0x80) != 0);
+    return value;
+  }
+
+  /**
+   * Reads the next 4 byte little endian int.
+   */
+  private int readIntLittleEndian() throws IOException {
+    int ch4 = in.read();
+    int ch3 = in.read();
+    int ch2 = in.read();
+    int ch1 = in.read();
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
+  /**
+   * Reads the next byteWidth little endian int.
+   */
+  private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
+    switch (bytesWidth) {
+      case 0:
+        return 0;
+      case 1:
+        return in.read();
+      case 2: {
+        int ch2 = in.read();
+        int ch1 = in.read();
+        return (ch1 << 8) + ch2;
+      }
+      case 3: {
+        int ch3 = in.read();
+        int ch2 = in.read();
+        int ch1 = in.read();
+        return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+      }
+      case 4: {
+        return readIntLittleEndian();
+      }
+    }
+    throw new RuntimeException("Unreachable");
+  }
+
+  private int ceil8(int value) {
+    return (value + 7) / 8;
+  }
+
+  /**
+   * Reads the next group.
+   */
+  private void readNextGroup() {
+    try {
+      int header = readUnsignedVarInt();
+      this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+      switch (mode) {
+        case RLE:
+          this.currentCount = header >>> 1;
+          this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+          return;
+        case PACKED:
+          int numGroups = header >>> 1;
+          this.currentCount = numGroups * 8;
+
+          if (this.currentBuffer.length < this.currentCount) {
+            this.currentBuffer = new int[this.currentCount];
+          }
+          currentBufferIdx = 0;
+          int valueIndex = 0;
+          while (valueIndex < this.currentCount) {
+            // values are bit packed 8 at a time, so reading bitWidth will always work
+            ByteBuffer buffer = in.slice(bitWidth);
+            this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
+            valueIndex += 8;
+          }
+          return;
+        default:
+          throw new ParquetDecodingException("not a valid mode " + this.mode);
+      }
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read from input stream", e);
+    }
+  }
+
+  @Override
+  public boolean readBoolean() {
+    return this.readInteger() != 0;
+  }
+
+  @Override
+  public void skip() {
+    this.readInteger();
+  }
+
+  @Override
+  public int readValueDictionaryId() {
+    return readInteger();
+  }
+
+  @Override
+  public int readInteger() {
+    if (this.currentCount == 0) {
+      this.readNextGroup();
+    }
+
+    this.currentCount--;
+    switch (mode) {
+      case RLE:
+        return this.currentValue;
+      case PACKED:
+        return this.currentBuffer[currentBufferIdx++];
+    }
+    throw new RuntimeException("Unreachable");
+  }
+
+  public void readBatchOfIntegers(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    ArrowBuf validityBuffer = vector.getValidityBuffer();
+    ArrowBuf dataBuffer = vector.getDataBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          bufferIdx =
+              fillFixWidthValueBuffer(
+                  typeWidth,
+                  maxDefLevel,
+                  nullabilityHolder,
+                  valuesReader,
+                  bufferIdx,
+                  dataBuffer,
+                  n);
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
+              //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
+              dataBuffer.setInt(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth).getInt());
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              bufferIdx++;
+            } else {
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  public void readBatchOfLongs(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    ArrowBuf validityBuffer = vector.getValidityBuffer();
+    ArrowBuf dataBuffer = vector.getDataBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          bufferIdx =
+              fillFixWidthValueBuffer(
+                  typeWidth,
+                  maxDefLevel,
+                  nullabilityHolder,
+                  valuesReader,
+                  bufferIdx,
+                  dataBuffer,
+                  n);
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
+              //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
+              dataBuffer.setLong(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth).getLong());
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              bufferIdx++;
+            } else {
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  public void readBatchOfFloats(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    ArrowBuf validityBuffer = vector.getValidityBuffer();
+    ArrowBuf dataBuffer = vector.getDataBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          bufferIdx =
+              fillFixWidthValueBuffer(
+                  typeWidth,
+                  maxDefLevel,
+                  nullabilityHolder,
+                  valuesReader,
+                  bufferIdx,
+                  dataBuffer,
+                  n);
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
+              //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
+              dataBuffer.setFloat(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth).getFloat());
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              bufferIdx++;
+            } else {
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  public void readBatchOfDoubles(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    ArrowBuf validityBuffer = vector.getValidityBuffer();
+    ArrowBuf dataBuffer = vector.getDataBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          bufferIdx =
+              fillFixWidthValueBuffer(
+                  typeWidth,
+                  maxDefLevel,
+                  nullabilityHolder,
+                  valuesReader,
+                  bufferIdx,
+                  dataBuffer,
+                  n);
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
+              //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
+              dataBuffer.setDouble(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth).getDouble());
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              bufferIdx++;
+            } else {
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  public void readBatchOfFixedWidthBinary(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    ArrowBuf validityBuffer = vector.getValidityBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            // for (int i = 0; i < n; i++) {
+            //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+            //   validityBufferIdx++;
+            // }
+            for (int i = 0; i < n; i++) {
+              bufferIdx = setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+            }
+          } else {
+            for (int i = 0; i < n; i++) {
+              //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              bufferIdx = setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+            } else {
+              //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  public void readBatchOfFixedLengthDecimals(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    //ArrowBuf validityBuffer = vector.getValidityBuffer();
+    //ArrowBuf dataBuffer = vector.getDataBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            // for (int i = 0; i < n; i++) {
+            //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+            //   validityBufferIdx++;
+            // }
+            for (int i = 0; i < n; i++) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              //bytesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+              bufferIdx++;
+            }
+          } else {
+            for (int i = 0; i < n; i++) {
+              //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+              bufferIdx++;
+            } else {
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  /**
+   * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+   * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
+   * vector. It appropriately sets the validity buffer in the Arrow vector.
+   */
+  public void readBatchVarWidth(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    ArrowBuf dataBuffer = vector.getDataBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            // for (int i = 0; i < n; i++) {
+            //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+            //   validityBufferIdx++;
+            // }
+            for (int i = 0; i < n; i++) {
+              int len = valuesReader.readInteger();
+              ByteBuffer buffer = valuesReader.getBuffer(len);
+              ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
+              dataBuffer.writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
+              bufferIdx++;
+            }
+          } else {
+            //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+            nullabilityHolder.setNulls(bufferIdx, n);
+            bufferIdx += n;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              int len = valuesReader.readInteger();
+              ByteBuffer buffer = valuesReader.getBuffer(len);
+              ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
+              dataBuffer.writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
+              bufferIdx++;
+            } else {
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  public void readBatchOfIntLongBackedDecimals(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    ArrowBuf validityBuffer = vector.getValidityBuffer();
+    ArrowBuf dataBuffer = vector.getDataBuffer();
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+              dataBuffer.setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+              bufferIdx++;
+            }
+          } else {
+            nullabilityHolder.setNulls(bufferIdx, n);
+            bufferIdx += n;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+              dataBuffer.setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+              bufferIdx++;
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+            } else {
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  public void readBatchOfBooleans(
+      final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+              ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+              bufferIdx++;
+            }
+          } else {
+            for (int i = 0; i < n; i++) {
+              ((BitVector) vector).setNull(bufferIdx);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
+              ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+              bufferIdx++;
+            } else {
+              ((BitVector) vector).setNull(bufferIdx);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
+            }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  private int setBinaryInVector(VarBinaryVector vector, int typeWidth, BytesReader valuesReader, int bufferIdx) {
+    byte[] byteArray = new byte[typeWidth];
+    valuesReader.getBuffer(typeWidth).get(byteArray);
+    vector.setSafe(bufferIdx, byteArray);
+    bufferIdx++;
+    return bufferIdx;
+  }
+
+  private int fillFixWidthValueBuffer(
+      int typeWidth, int maxDefLevel, NullabilityHolder nullabilityHolder,
+      BytesReader valuesReader, int bufferIdx, ArrowBuf dataBuffer, int n) {
+    if (currentValue == maxDefLevel) {
+      // for (int i = 0; i < n; i++) {
+      //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+      //   validityBufferIdx++;
+      // }
+      ByteBuffer buffer = valuesReader.getBuffer(n * typeWidth);
+      dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
+      bufferIdx += n;
+    } else {
+      for (int i = 0; i < n; i++) {
+        //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
+        nullabilityHolder.setNull(bufferIdx);
+        bufferIdx++;
+      }
+    }
+    return bufferIdx;
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
new file mode 100644
index 0000000..f8552ea
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.iceberg.parquet.NullabilityHolder;
+import org.apache.spark.sql.execution.arrow.ArrowUtils;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Implementation of Spark's {@link ColumnVector} interface. The main purpose
+ * of this class is to prevent the expensive nullability checks made by Spark's
+ * {@link ArrowColumnVector} implementation by delegating those calls to the
+ * Iceberg's {@link NullabilityHolder}.
+ */
+
+public class IcebergArrowColumnVector extends ColumnVector {
+
+  private final ArrowVectorAccessor accessor;
+  private final NullabilityHolder nullabilityHolder;
+  private ArrowColumnVector[] childColumns;
+
+  public IcebergArrowColumnVector(ValueVector vector, NullabilityHolder nulls) {
+    super(ArrowUtils.fromArrowField(vector.getField()));
+    this.nullabilityHolder = nulls;
+    this.accessor = getAccessor(vector);
+  }
+
+  @Override
+  public void close() {
+    if (childColumns != null) {
+      for (int i = 0; i < childColumns.length; i++) {
+        childColumns[i].close();
+        childColumns[i] = null;
+      }
+      childColumns = null;
+    }
+    accessor.close();
+  }
+
+  @Override
+  public boolean hasNull() {
+    return nullabilityHolder.hasNulls();
+  }
+
+  @Override
+  public int numNulls() {
+    return nullabilityHolder.numNulls();
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    return nullabilityHolder.isNullAt(rowId);
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    return accessor.getBoolean(rowId);
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    return accessor.getByte(rowId);
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    return accessor.getShort(rowId);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return accessor.getInt(rowId);
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    return accessor.getLong(rowId);
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    return accessor.getFloat(rowId);
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    return accessor.getDouble(rowId);
+  }
+
+  @Override
+  public ColumnarArray getArray(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getArray(rowId);
+  }
+
+  @Override
+  public ColumnarMap getMap(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getDecimal(rowId, precision, scale);
+  }
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getUTF8String(rowId);
+  }
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getBinary(rowId);
+  }
+
+  @Override
+  public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
+
+  private abstract class ArrowVectorAccessor {
+
+    private final ValueVector vector;
+
+    ArrowVectorAccessor(ValueVector vector) {
+      this.vector = vector;
+    }
+
+    // TODO: should be final after removing ArrayAccessor workaround
+    boolean isNullAt(int rowId) {
+      return nullabilityHolder.isNullAt(rowId);
+    }
+
+    final void close() {
+      vector.close();
+    }
+
+    boolean getBoolean(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    byte getByte(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    short getShort(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    int getInt(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    long getLong(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    float getFloat(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    double getDouble(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    Decimal getDecimal(int rowId, int precision, int scale) {
+      throw new UnsupportedOperationException();
+    }
+
+    UTF8String getUTF8String(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    byte[] getBinary(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    ColumnarArray getArray(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private ArrowVectorAccessor getAccessor(ValueVector vector) {
+    if (vector instanceof BitVector) {
+      return new BooleanAccessor((BitVector) vector);
+    } else if (vector instanceof TinyIntVector) {
+      return new ByteAccessor((TinyIntVector) vector);
+    } else if (vector instanceof SmallIntVector) {
+      return new ShortAccessor((SmallIntVector) vector);
+    } else if (vector instanceof IntVector) {
+      return new IntAccessor((IntVector) vector);
+    } else if (vector instanceof BigIntVector) {
+      return new LongAccessor((BigIntVector) vector);
+    } else if (vector instanceof Float4Vector) {
+      return new FloatAccessor((Float4Vector) vector);
+    } else if (vector instanceof Float8Vector) {
+      return new DoubleAccessor((Float8Vector) vector);
+    } else if (vector instanceof IcebergDecimalArrowVector) {
+      return new DecimalAccessor((IcebergDecimalArrowVector) vector);
+    } else if (vector instanceof IcebergVarcharArrowVector) {
+      return new StringAccessor((IcebergVarcharArrowVector) vector);
+    } else if (vector instanceof IcebergVarBinaryArrowVector) {
+      return new BinaryAccessor((IcebergVarBinaryArrowVector) vector);
+    } else if (vector instanceof DateDayVector) {
+      return new DateAccessor((DateDayVector) vector);
+    } else if (vector instanceof TimeStampMicroTZVector) {
+      return new TimestampAccessor((TimeStampMicroTZVector) vector);
+    } else if (vector instanceof ListVector) {
+      ListVector listVector = (ListVector) vector;
+      return new ArrayAccessor(listVector);
+    } else if (vector instanceof StructVector) {
+      StructVector structVector = (StructVector) vector;
+      ArrowVectorAccessor accessor = new StructAccessor(structVector);
+      childColumns = new ArrowColumnVector[structVector.size()];
+      for (int i = 0; i < childColumns.length; ++i) {
+        childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
+      }
+      return accessor;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private class BooleanAccessor extends ArrowVectorAccessor {
+
+    private final BitVector vector;
+
+    BooleanAccessor(BitVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final boolean getBoolean(int rowId) {
+      return vector.get(rowId) == 1;
+    }
+  }
+
+  private class ByteAccessor extends ArrowVectorAccessor {
+
+    private final TinyIntVector vector;
+
+    ByteAccessor(TinyIntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final byte getByte(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class ShortAccessor extends ArrowVectorAccessor {
+
+    private final SmallIntVector vector;
+
+    ShortAccessor(SmallIntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final short getShort(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class IntAccessor extends ArrowVectorAccessor {
+
+    private final IntVector vector;
+
+    IntAccessor(IntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final int getInt(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class LongAccessor extends ArrowVectorAccessor {
+
+    private final BigIntVector vector;
+
+    LongAccessor(BigIntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class FloatAccessor extends ArrowVectorAccessor {
+
+    private final Float4Vector vector;
+
+    FloatAccessor(Float4Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final float getFloat(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class DoubleAccessor extends ArrowVectorAccessor {
+
+    private final Float8Vector vector;
+
+    DoubleAccessor(Float8Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final double getDouble(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class DecimalAccessor extends ArrowVectorAccessor {
+
+    private final IcebergDecimalArrowVector vector;
+
+    DecimalAccessor(IcebergDecimalArrowVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      if (isNullAt(rowId)) return null;
+      return Decimal.apply(vector.getObject(rowId), precision, scale);
+    }
+  }
+
+  private class StringAccessor extends ArrowVectorAccessor {
+
+    private final IcebergVarcharArrowVector vector;
+    private final NullableVarCharHolder stringResult = new NullableVarCharHolder();
+
+    StringAccessor(IcebergVarcharArrowVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final UTF8String getUTF8String(int rowId) {
+      vector.get(rowId, stringResult);
+      if (stringResult.isSet == 0) {
+        return null;
+      } else {
+        return UTF8String.fromAddress(null,
+            stringResult.buffer.memoryAddress() + stringResult.start,
+            stringResult.end - stringResult.start);
+      }
+    }
+  }
+
+  private class FixedSizeBinaryAccessor extends ArrowVectorAccessor {
+
+    private final FixedSizeBinaryVector vector;
+
+    FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final byte[] getBinary(int rowId) {
+      return vector.getObject(rowId);
+    }
+  }
+
+  private class BinaryAccessor extends ArrowVectorAccessor {
+
+    private final VarBinaryVector vector;
+
+    BinaryAccessor(VarBinaryVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final byte[] getBinary(int rowId) {
+      return vector.getObject(rowId);
+    }
+  }
+
+  private class DateAccessor extends ArrowVectorAccessor {
+
+    private final DateDayVector vector;
+
+    DateAccessor(DateDayVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final int getInt(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class TimestampAccessor extends ArrowVectorAccessor {
+
+    private final TimeStampMicroTZVector vector;
+
+    TimestampAccessor(TimeStampMicroTZVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private class ArrayAccessor extends ArrowVectorAccessor {
+
+    private final ListVector vector;
+    private final ArrowColumnVector arrayData;
+
+    ArrayAccessor(ListVector vector) {
+      super(vector);
+      this.vector = vector;
+      this.arrayData = new ArrowColumnVector(vector.getDataVector());
+    }
+
+    @Override
+    final boolean isNullAt(int rowId) {
+      // TODO: Workaround if vector has all non-null values, see ARROW-1948
+      if (vector.getValueCount() > 0 && vector.getValidityBuffer().capacity() == 0) {
+        return false;
+      } else {
+        return super.isNullAt(rowId);
+      }
+    }
+
+    @Override
+    final ColumnarArray getArray(int rowId) {
+      ArrowBuf offsets = vector.getOffsetBuffer();
+      int index = rowId * ListVector.OFFSET_WIDTH;
+      int start = offsets.getInt(index);
+      int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
+      return new ColumnarArray(arrayData, start, end - start);
+    }
+  }
+
+  /**
+   * Any call to "get" method will throw UnsupportedOperationException.
+   *
+   * Access struct values in a ArrowColumnVector doesn't use this vector. Instead, it uses
+   * getStruct() method defined in the parent class. Any call to "get" method in this class is a
+   * bug in the code.
+   *
+   */
+  private class StructAccessor extends ArrowVectorAccessor {
+
+    StructAccessor(StructVector vector) {
+      super(vector);
+    }
+  }
+
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
new file mode 100644
index 0000000..1815414
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
@@ -0,0 +1,36 @@
+package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.iceberg.parquet.NullabilityHolder;
+
+/**
+ *
+ * Extension of Arrow's @{@link DecimalVector}. The whole reason of having
+ * this implementation is to override the expensive {@link DecimalVector#isSet(int)} method
+ * used by  {@link DecimalVector#getObject(int)}.
+ */
+public class IcebergDecimalArrowVector extends DecimalVector {
+  private NullabilityHolder nullabilityHolder;
+
+  public IcebergDecimalArrowVector(
+      String name,
+      BufferAllocator allocator, int precision, int scale) {
+    super(name, allocator, precision, scale);
+  }
+
+  /**
+   * Same as {@link #isNull(int)}.
+   *
+   * @param index position of element
+   * @return 1 if element at given index is not null, 0 otherwise
+   */
+  @Override
+  public int isSet(int index) {
+    return nullabilityHolder.isNullAt(index) ? 0 : 1;
+  }
+
+  public void setNullabilityHolder(NullabilityHolder nullabilityHolder) {
+    this.nullabilityHolder = nullabilityHolder;
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
new file mode 100644
index 0000000..7115564
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.iceberg.parquet.NullabilityHolder;
+
+/**
+ * Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having
+ * this implementation is to override the expensive {@link VarBinaryVector#isSet(int)} method.
+ */
+public class IcebergVarBinaryArrowVector extends VarBinaryVector {
+  private NullabilityHolder nullabilityHolder;
+
+  public IcebergVarBinaryArrowVector(
+      String name,
+      BufferAllocator allocator) {
+    super(name, allocator);
+  }
+
+  /**
+   * Same as {@link #isNull(int)}.
+   *
+   * @param index position of element
+   * @return 1 if element at given index is not null, 0 otherwise
+   */
+  public int isSet(int index) {
+    return nullabilityHolder.isNullAt(index) ? 0 : 1;
+  }
+
+  public void setNullabilityHolder(NullabilityHolder nullabilityHolder) {
+    this.nullabilityHolder = nullabilityHolder;
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
new file mode 100644
index 0000000..2b7bc53
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.iceberg.parquet.NullabilityHolder;
+
+/**
+ * Extension of Arrow's @{@link VarCharVector}. The whole reason of having
+ * this implementation is to override the expensive {@link VarCharVector#isSet(int)} method.
+ */
+public class IcebergVarcharArrowVector extends VarCharVector {
+
+  private NullabilityHolder nullabilityHolder;
+
+  public IcebergVarcharArrowVector(
+      String name,
+      BufferAllocator allocator) {
+    super(name, allocator);
+  }
+
+  /**
+   * Same as {@link #isNull(int)}.
+   *
+   * @param index  position of element
+   * @return 1 if element at given index is not null, 0 otherwise
+   */
+  public int isSet(int index) {
+    return nullabilityHolder.isNullAt(index) ? 0 : 1;
+  }
+
+  public void setNullabilityHolder(NullabilityHolder nullabilityHolder) {
+    this.nullabilityHolder = nullabilityHolder;
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index db67521..5c58b0c 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -96,7 +96,7 @@ public abstract class IcebergSourceBenchmark {
   protected void setupSpark() {
     spark = SparkSession.builder()
         .config("spark.ui.enabled", false)
-            .config("parquet.enable.dictionary",false)
+            .config("parquet.enable.dictionary", false)
     .config(PARQUET_DICT_SIZE_BYTES, "1")
             .config("parquet.dictionary.page.size", "1")
         .master("local")
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
index ec0163a..1f81236 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
@@ -19,18 +19,17 @@
 
 package org.apache.iceberg.spark.source;
 
-        import com.google.common.collect.Maps;
-        import java.util.Map;
-        import org.apache.hadoop.conf.Configuration;
-        import org.apache.iceberg.PartitionSpec;
-        import org.apache.iceberg.Schema;
-        import org.apache.iceberg.Table;
-        import org.apache.iceberg.TableProperties;
-        import org.apache.iceberg.hadoop.HadoopTables;
-        import org.apache.iceberg.types.Types;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
 
-        import static org.apache.iceberg.types.Types.NestedField.optional;
-        import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.iceberg.types.Types.NestedField.optional;
 
 public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
 
@@ -42,10 +41,10 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
   @Override
   protected final Table initTable() {
     Schema schema = new Schema(
-            required(1, "longCol", Types.LongType.get()),
-            required(2, "intCol", Types.IntegerType.get()),
-            required(3, "floatCol", Types.FloatType.get()),
-            optional(4, "doubleCol", Types.DoubleType.get()),
+            optional(1, "longCol", Types.LongType.get()),
+            optional(2, "intCol", Types.LongType.get()),
+            optional(3, "floatCol", Types.LongType.get()),
+            optional(4, "doubleCol", Types.LongType.get()),
             optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
             optional(6, "dateCol", Types.DateType.get()),
             optional(7, "timestampCol", Types.TimestampType.withZone()),
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
deleted file mode 100644
index 01144ab..0000000
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source.parquet;
-
-import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.internal.SQLConf;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-
-import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
-import static org.apache.spark.sql.functions.current_date;
-import static org.apache.spark.sql.functions.date_add;
-import static org.apache.spark.sql.functions.expr;
-
-/**
- * A benchmark that evaluates the performance of reading Parquet data with a flat schema
- * using Iceberg and the built-in file source in Spark.
- *
- * To run this benchmark:
- * <code>
- *   ./gradlew :iceberg-spark:jmh
- *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
- *       -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
- * </code>
- */
-public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlatDataBenchmark {
-
-  private static final int NUM_FILES = 10;
-  private static final int NUM_ROWS = 10000000;
-
-  @Setup
-  public void setupBenchmark() {
-    setupSpark();
-    appendData();
-  }
-
-  @TearDown
-  public void tearDownBenchmark() throws IOException {
-    tearDownSpark();
-    cleanupFiles();
-  }
-
-//  @Benchmark
-//  @Threads(1)
-//  public void readIcebergVectorized100k() {
-//    Map<String, String> tableProperties = Maps.newHashMap();
-//    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-//    withTableProperties(tableProperties, () -> {
-//      String tableLocation = table().location();
-//      Dataset<Row> df = spark().read().format("iceberg")
-//          .option("iceberg.read.numrecordsperbatch", "100000")
-//          .load(tableLocation);
-//      materialize(df);
-//    });
-//  }
-//
-//  @Benchmark
-//  @Threads(1)
-//  public void readIcebergVectorized10k() {
-//    Map<String, String> tableProperties = Maps.newHashMap();
-//    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-//    withTableProperties(tableProperties, () -> {
-//      String tableLocation = table().location();
-//      Dataset<Row> df = spark().read().format("iceberg")
-//          .option("iceberg.read.numrecordsperbatch", "10000")
-//          .load(tableLocation);
-//      materialize(df);
-//    });
-//  }
-
-
-  @Benchmark
-  @Threads(1)
-  public void readIcebergVectorized100() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-              .option("iceberg.read.numrecordsperbatch", "100")
-              .load(tableLocation);
-      materialize(df);
-    });
-  }
-
-  @Benchmark
-  @Threads(1)
-  public void readIcebergVectorized5k() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "5000")
-          .load(tableLocation);
-      materialize(df);
-    });
-  }
-
-  @Benchmark
-  @Threads(1)
-  public void readFileSourceVectorized() {
-    Map<String, String> conf = Maps.newHashMap();
-    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
-    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
-    withSQLConf(conf, () -> {
-      Dataset<Row> df = spark().read().parquet(dataLocation());
-      materialize(df);
-    });
-  }
-
-  @Benchmark
-  @Threads(1)
-  public void readFileSourceNonVectorized() {
-    Map<String, String> conf = Maps.newHashMap();
-    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
-    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
-    withSQLConf(conf, () -> {
-      Dataset<Row> df = spark().read().parquet(dataLocation());
-      materialize(df);
-    });
-  }
-
-//  @Benchmark
-//  @Threads(1)
-//  public void readWithProjectionIcebergVectorized100k() {
-//    Map<String, String> tableProperties = Maps.newHashMap();
-//    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-//    withTableProperties(tableProperties, () -> {
-//      String tableLocation = table().location();
-//      Dataset<Row> df = spark().read().format("iceberg")
-//          .option("iceberg.read.numrecordsperbatch", "100000")
-//          .load(tableLocation).select("longCol");
-//      materialize(df);
-//    });
-//  }
-//
-//  @Benchmark
-//  @Threads(1)
-//  public void readWithProjectionIcebergVectorized10k() {
-//    Map<String, String> tableProperties = Maps.newHashMap();
-//    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-//    withTableProperties(tableProperties, () -> {
-//      String tableLocation = table().location();
-//      Dataset<Row> df = spark().read().format("iceberg")
-//          .option("iceberg.read.numrecordsperbatch", "10000")
-//          .load(tableLocation).select("longCol");
-//      materialize(df);
-//    });
-//  }
-
-
-//  @Benchmark
-//  @Threads(1)
-//  public void readWithProjectionIcebergVectorized5k() {
-//    Map<String, String> tableProperties = Maps.newHashMap();
-//    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-//    withTableProperties(tableProperties, () -> {
-//      String tableLocation = table().location();
-//      Dataset<Row> df = spark().read().format("iceberg")
-//          .option("iceberg.read.numrecordsperbatch", "5000")
-//          .load(tableLocation).select("longCol");
-//      materialize(df);
-//    });
-//  }
-//
-//  @Benchmark
-//  @Threads(1)
-//  public void readWithProjectionFileSourceVectorized() {
-//    Map<String, String> conf = Maps.newHashMap();
-//    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
-//    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
-//    withSQLConf(conf, () -> {
-//      Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
-//      materialize(df);
-//    });
-//  }
-
-//  @Benchmark
-//  @Threads(1)
-//  public void readWithProjectionFileSourceNonVectorized() {
-//    Map<String, String> conf = Maps.newHashMap();
-//    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
-//    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
-//    withSQLConf(conf, () -> {
-//      Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
-//      materialize(df);
-//    });
-//  }
-
-
-  @Benchmark
-  @Threads(1)
-  public void readIcebergVectorized1k() {
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
-    withTableProperties(tableProperties, () -> {
-      String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg")
-          .option("iceberg.read.numrecordsperbatch", "1000")
-          .load(tableLocation);
-      materialize(df);
-    });
-  }
-
-
-  private void appendData() {
-    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
-      Dataset<Row> df = spark().range(NUM_ROWS)
-          .withColumnRenamed("id", "longCol")
-          .withColumn("intCol", expr("CAST(longCol AS INT)"))
-          .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
-          .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
-          .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
-          .withColumn("dateCol", date_add(current_date(), fileNum))
-          .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
-          .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
-      appendAsFile(df);
-    }
-  }
-}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
index ab62f53..3a886c1 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
@@ -32,8 +32,6 @@ import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 
-import static org.apache.spark.sql.functions.expr;
-
 /**
  * A benchmark that evaluates the performance of writing Parquet data with a flat schema
  * using Iceberg and the built-in file source in Spark.
@@ -78,13 +76,13 @@ public class IcebergSourceFlatParquetDataWriteBenchmark extends IcebergSourceFla
   private Dataset<Row> benchmarkData() {
     return spark().range(NUM_ROWS)
         .withColumnRenamed("id", "longCol")
-        .withColumn("intCol", expr("CAST(longCol AS INT)"))
-        .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
-        .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
-        .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
-        .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))"))
-        .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
-        .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+        // .withColumn("intCol", expr("CAST(longCol AS INT)"))
+        // .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+        // .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+        // .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+        // .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))"))
+        // .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+        // .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
         .coalesce(1);
   }
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedIcebergSourceBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedIcebergSourceBenchmark.java
new file mode 100644
index 0000000..bf1fbc7
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedIcebergSourceBenchmark.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet.vectorized;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.spark.source.IcebergSourceBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+
+/**
+ * Parent class of the benchmarks that compare performance of performance of reading Parquet data with a
+ * flat schema using vectorized Iceberg read path and the built-in file source in Spark.
+ * <p>
+ * To run all the the benchmarks that extend this class:
+ * <code>
+ * ./gradlew :iceberg-spark:jmh
+ * -PjmhIncludeRegex=VectorizedRead*Benchmark
+ * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
+ * </code>
+ */
+
+public abstract class VectorizedIcebergSourceBenchmark extends IcebergSourceBenchmark {
+  static final int NUM_FILES = 10;
+  static final int NUM_ROWS = 10000000;
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+    appendData();
+    // Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
+    System.setProperty("arrow.enable_unsafe_memory_access", "true");
+    // Disable expensive null check for every get(index) call.
+    // Iceberg manages nullability checks itself instead of relying on arrow.
+    System.setProperty("arrow.enable_null_check_for_get", "false");
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  protected Configuration initHadoopConf() {
+    return new Configuration();
+  }
+
+  protected abstract void appendData();
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized100() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "100")
+          .load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized1k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "1000")
+          .load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized5k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "5000")
+          .load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized10k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "10000")
+          .load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation());
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionIcebergVectorized1k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "1000")
+          .load(tableLocation).select("longCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionIcebergVectorized5k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "5000")
+          .load(tableLocation).select("longCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionIcebergVectorized10k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "10000")
+          .load(tableLocation).select("longCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionFileSourceVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionFileSourceNonVectorized() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+    conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+    withSQLConf(conf, () -> {
+      Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
+      materialize(df);
+    });
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFloatsBenchmark.java
similarity index 51%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFloatsBenchmark.java
index ec0163a..1511151 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFloatsBenchmark.java
@@ -17,39 +17,29 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.spark.source.parquet.vectorized;
 
-        import com.google.common.collect.Maps;
-        import java.util.Map;
-        import org.apache.hadoop.conf.Configuration;
-        import org.apache.iceberg.PartitionSpec;
-        import org.apache.iceberg.Schema;
-        import org.apache.iceberg.Table;
-        import org.apache.iceberg.TableProperties;
-        import org.apache.iceberg.hadoop.HadoopTables;
-        import org.apache.iceberg.types.Types;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
-        import static org.apache.iceberg.types.Types.NestedField.optional;
-        import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
 
-public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
-
-  @Override
-  protected Configuration initHadoopConf() {
-    return new Configuration();
-  }
+public class VectorizedReadFloatsBenchmark extends VectorizedIcebergSourceBenchmark {
 
   @Override
   protected final Table initTable() {
     Schema schema = new Schema(
-            required(1, "longCol", Types.LongType.get()),
-            required(2, "intCol", Types.IntegerType.get()),
-            required(3, "floatCol", Types.FloatType.get()),
-            optional(4, "doubleCol", Types.DoubleType.get()),
-            optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
-            optional(6, "dateCol", Types.DateType.get()),
-            optional(7, "timestampCol", Types.TimestampType.withZone()),
-            optional(8, "stringCol", Types.StringType.get()));
+        optional(1, "longCol", Types.LongType.get()),
+        optional(2, "floatCol", Types.FloatType.get()));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
     Map<String, String> properties = Maps.newHashMap();
@@ -57,4 +47,14 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
     properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
     return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
+
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumnRenamed("id", "longCol")
+          .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"));
+      appendAsFile(df);
+    }
+  }
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFloatsTwentyPercentNullBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFloatsTwentyPercentNullBenchmark.java
new file mode 100644
index 0000000..d4c1d41
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFloatsTwentyPercentNullBenchmark.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet.vectorized;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.pmod;
+import static org.apache.spark.sql.functions.when;
+
+public class VectorizedReadFloatsTwentyPercentNullBenchmark extends VectorizedReadFloatsBenchmark {
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumn("longCol", when(pmod(col("id"), lit(2)).equalTo(lit(0)), lit(null)).otherwise(col("id")))
+          .drop("id")
+          .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"));
+      appendAsFile(df);
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntBackedDecimalsBenchmark.java
similarity index 50%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntBackedDecimalsBenchmark.java
index ec0163a..2fcab16 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntBackedDecimalsBenchmark.java
@@ -17,39 +17,28 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.spark.source.parquet.vectorized;
 
-        import com.google.common.collect.Maps;
-        import java.util.Map;
-        import org.apache.hadoop.conf.Configuration;
-        import org.apache.iceberg.PartitionSpec;
-        import org.apache.iceberg.Schema;
-        import org.apache.iceberg.Table;
-        import org.apache.iceberg.TableProperties;
-        import org.apache.iceberg.hadoop.HadoopTables;
-        import org.apache.iceberg.types.Types;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
-        import static org.apache.iceberg.types.Types.NestedField.optional;
-        import static org.apache.iceberg.types.Types.NestedField.required;
-
-public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
-
-  @Override
-  protected Configuration initHadoopConf() {
-    return new Configuration();
-  }
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
 
+public class VectorizedReadIntBackedDecimalsBenchmark extends VectorizedIcebergSourceBenchmark {
   @Override
   protected final Table initTable() {
     Schema schema = new Schema(
-            required(1, "longCol", Types.LongType.get()),
-            required(2, "intCol", Types.IntegerType.get()),
-            required(3, "floatCol", Types.FloatType.get()),
-            optional(4, "doubleCol", Types.DoubleType.get()),
-            optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
-            optional(6, "dateCol", Types.DateType.get()),
-            optional(7, "timestampCol", Types.TimestampType.withZone()),
-            optional(8, "stringCol", Types.StringType.get()));
+        optional(1, "longCol", Types.LongType.get()),
+        optional(2, "decimalCol", Types.DecimalType.of(9, 0)));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
     Map<String, String> properties = Maps.newHashMap();
@@ -57,4 +46,14 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
     properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
     return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
+
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumnRenamed("id", "longCol")
+          .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(9, 0))"));
+      appendAsFile(df);
+    }
+  }
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntegersBenchmark.java
similarity index 50%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntegersBenchmark.java
index ec0163a..df88486 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntegersBenchmark.java
@@ -17,39 +17,28 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.spark.source.parquet.vectorized;
 
-        import com.google.common.collect.Maps;
-        import java.util.Map;
-        import org.apache.hadoop.conf.Configuration;
-        import org.apache.iceberg.PartitionSpec;
-        import org.apache.iceberg.Schema;
-        import org.apache.iceberg.Table;
-        import org.apache.iceberg.TableProperties;
-        import org.apache.iceberg.hadoop.HadoopTables;
-        import org.apache.iceberg.types.Types;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
-        import static org.apache.iceberg.types.Types.NestedField.optional;
-        import static org.apache.iceberg.types.Types.NestedField.required;
-
-public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
-
-  @Override
-  protected Configuration initHadoopConf() {
-    return new Configuration();
-  }
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
 
+public class VectorizedReadIntegersBenchmark extends VectorizedIcebergSourceBenchmark {
   @Override
   protected final Table initTable() {
     Schema schema = new Schema(
-            required(1, "longCol", Types.LongType.get()),
-            required(2, "intCol", Types.IntegerType.get()),
-            required(3, "floatCol", Types.FloatType.get()),
-            optional(4, "doubleCol", Types.DoubleType.get()),
-            optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
-            optional(6, "dateCol", Types.DateType.get()),
-            optional(7, "timestampCol", Types.TimestampType.withZone()),
-            optional(8, "stringCol", Types.StringType.get()));
+        optional(1, "longCol", Types.LongType.get()),
+        optional(2, "intCol", Types.IntegerType.get()));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
     Map<String, String> properties = Maps.newHashMap();
@@ -57,4 +46,14 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
     properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
     return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
+
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumnRenamed("id", "longCol")
+          .withColumn("intCol", expr("CAST(longCol AS INT)"));
+      appendAsFile(df);
+    }
+  }
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntegersTwentyPercentNullBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntegersTwentyPercentNullBenchmark.java
new file mode 100644
index 0000000..57f4de2
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadIntegersTwentyPercentNullBenchmark.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet.vectorized;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.pmod;
+import static org.apache.spark.sql.functions.when;
+
+public class VectorizedReadIntegersTwentyPercentNullBenchmark extends VectorizedReadIntegersBenchmark {
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumn("longCol", when(pmod(col("id"), lit(2)).equalTo(lit(0)), lit(null)).otherwise(col("id")))
+          .drop("id")
+          .withColumn("intCol", expr("CAST(longCol AS INT)"));
+      appendAsFile(df);
+    }
+  }
+  ;
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadLongsBenchmark.java
similarity index 50%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadLongsBenchmark.java
index ec0163a..cfb7490 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadLongsBenchmark.java
@@ -17,39 +17,27 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.spark.source.parquet.vectorized;
 
-        import com.google.common.collect.Maps;
-        import java.util.Map;
-        import org.apache.hadoop.conf.Configuration;
-        import org.apache.iceberg.PartitionSpec;
-        import org.apache.iceberg.Schema;
-        import org.apache.iceberg.Table;
-        import org.apache.iceberg.TableProperties;
-        import org.apache.iceberg.hadoop.HadoopTables;
-        import org.apache.iceberg.types.Types;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
-        import static org.apache.iceberg.types.Types.NestedField.optional;
-        import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.iceberg.types.Types.NestedField.optional;
 
-public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
-
-  @Override
-  protected Configuration initHadoopConf() {
-    return new Configuration();
-  }
+public class VectorizedReadLongsBenchmark extends VectorizedIcebergSourceBenchmark {
 
   @Override
   protected final Table initTable() {
     Schema schema = new Schema(
-            required(1, "longCol", Types.LongType.get()),
-            required(2, "intCol", Types.IntegerType.get()),
-            required(3, "floatCol", Types.FloatType.get()),
-            optional(4, "doubleCol", Types.DoubleType.get()),
-            optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
-            optional(6, "dateCol", Types.DateType.get()),
-            optional(7, "timestampCol", Types.TimestampType.withZone()),
-            optional(8, "stringCol", Types.StringType.get()));
+        optional(1, "longCol", Types.LongType.get()));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
     Map<String, String> properties = Maps.newHashMap();
@@ -57,4 +45,14 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
     properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
     return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
+
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumnRenamed("id", "longCol");
+      appendAsFile(df);
+    }
+  }
+
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadLongsTwentyPercentNullBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadLongsTwentyPercentNullBenchmark.java
new file mode 100644
index 0000000..ecf9c6b
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadLongsTwentyPercentNullBenchmark.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet.vectorized;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.pmod;
+import static org.apache.spark.sql.functions.when;
+
+public class VectorizedReadLongsTwentyPercentNullBenchmark extends VectorizedReadLongsBenchmark {
+
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumn("longCol", when(pmod(col("id"), lit(2)).equalTo(lit(0)), lit(null)).otherwise(col("id")))
+          .drop("id");
+
+      appendAsFile(df);
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadPrimitivesBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadPrimitivesBenchmark.java
new file mode 100644
index 0000000..3619f58
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadPrimitivesBenchmark.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet.vectorized;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.pmod;
+import static org.apache.spark.sql.functions.when;
+
+public class VectorizedReadPrimitivesBenchmark extends VectorizedIcebergSourceBenchmark {
+
+  @Override
+  protected final Table initTable() {
+    Schema schema = new Schema(
+        optional(1, "longCol", Types.LongType.get()),
+        optional(2, "intCol", Types.LongType.get()),
+        optional(3, "floatCol", Types.LongType.get()),
+        optional(4, "doubleCol", Types.LongType.get()),
+        optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+        optional(6, "dateCol", Types.DateType.get()),
+        optional(7, "timestampCol", Types.TimestampType.withZone()),
+        optional(8, "stringCol", Types.StringType.get()));
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    HadoopTables tables = new HadoopTables(hadoopConf());
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+    properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
+    return tables.create(schema, partitionSpec, properties, newTableLocation());
+  }
+
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumn("longCol", when(pmod(col("id"), lit(2)).equalTo(lit(0)), lit(null)).otherwise(col("id")))
+          .drop("id")
+          .withColumn("intCol", expr("CAST(longCol AS BIGINT)"))
+          .withColumn("floatCol", expr("CAST(longCol AS BIGINT)"))
+          .withColumn("doubleCol", expr("CAST(longCol AS BIGINT)"))
+          .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+          .withColumn("dateCol", date_add(current_date(), fileNum))
+          .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+          .withColumn("stringCol", expr("CAST(longCol AS STRING)"));
+      appendAsFile(df);
+    }
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadStringsBenchmark.java
similarity index 50%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadStringsBenchmark.java
index ec0163a..36d0d07 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadStringsBenchmark.java
@@ -17,39 +17,28 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.spark.source.parquet.vectorized;
 
-        import com.google.common.collect.Maps;
-        import java.util.Map;
-        import org.apache.hadoop.conf.Configuration;
-        import org.apache.iceberg.PartitionSpec;
-        import org.apache.iceberg.Schema;
-        import org.apache.iceberg.Table;
-        import org.apache.iceberg.TableProperties;
-        import org.apache.iceberg.hadoop.HadoopTables;
-        import org.apache.iceberg.types.Types;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
-        import static org.apache.iceberg.types.Types.NestedField.optional;
-        import static org.apache.iceberg.types.Types.NestedField.required;
-
-public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
-
-  @Override
-  protected Configuration initHadoopConf() {
-    return new Configuration();
-  }
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
 
+public class VectorizedReadStringsBenchmark extends VectorizedIcebergSourceBenchmark {
   @Override
   protected final Table initTable() {
     Schema schema = new Schema(
-            required(1, "longCol", Types.LongType.get()),
-            required(2, "intCol", Types.IntegerType.get()),
-            required(3, "floatCol", Types.FloatType.get()),
-            optional(4, "doubleCol", Types.DoubleType.get()),
-            optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
-            optional(6, "dateCol", Types.DateType.get()),
-            optional(7, "timestampCol", Types.TimestampType.withZone()),
-            optional(8, "stringCol", Types.StringType.get()));
+        optional(1, "longCol", Types.LongType.get())
+        .optional(2, "stringCol", Types.StringType.get()));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
     Map<String, String> properties = Maps.newHashMap();
@@ -57,4 +46,15 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
     properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
     return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
+
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumnRenamed("id", "longCol")
+          .withColumn("stringCol", expr("CAST(longCol AS STRING)"));
+
+      appendAsFile(df);
+    }
+  }
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadStringsTwentyPercentNullBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadStringsTwentyPercentNullBenchmark.java
new file mode 100644
index 0000000..199622d
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadStringsTwentyPercentNullBenchmark.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet.vectorized;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.pmod;
+import static org.apache.spark.sql.functions.when;
+
+public class VectorizedReadStringsTwentyPercentNullBenchmark extends VectorizedReadStringsBenchmark {
+  @Override
+  protected void appendData() {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      Dataset<Row> df = spark().range(NUM_ROWS)
+          .withColumn("longCol", when(pmod(col("id"), lit(2)).equalTo(lit(0)), lit(null)).otherwise(col("id")))
+          .drop("id")
+          .withColumn("stringCol", expr("CAST(longCol AS STRING)"));
+
+      appendAsFile(df);
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
index 1696a6c..aad8602 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
@@ -56,7 +56,7 @@ public class VectorizedSparkParquetReaders {
       MessageType fileSchema) {
 
     return buildReader(tableSchema, expectedSchema, fileSchema,
-        VectorReader.DEFAULT_NUM_ROWS_IN_BATCH);
+        VectorReader.DEFAULT_BATCH_SIZE);
   }
 
   @SuppressWarnings("unchecked")
@@ -80,7 +80,11 @@ public class VectorizedSparkParquetReaders {
     private final BufferAllocator rootAllocator;
     private final int recordsPerBatch;
 
-    ReadBuilderBatched(Schema tableSchema, Schema projectedIcebergSchema, MessageType parquetSchema, int recordsPerBatch) {
+    ReadBuilderBatched(
+        Schema tableSchema,
+        Schema projectedIcebergSchema,
+        MessageType parquetSchema,
+        int recordsPerBatch) {
       this.parquetSchema = parquetSchema;
       this.tableIcebergSchema = tableSchema;
       this.projectedIcebergSchema = projectedIcebergSchema;
@@ -91,13 +95,15 @@ public class VectorizedSparkParquetReaders {
     }
 
     @Override
-    public BatchedReader message(Types.StructType expected, MessageType message,
+    public BatchedReader message(
+        Types.StructType expected, MessageType message,
         List<BatchedReader> fieldReaders) {
       return struct(expected, message.asGroupType(), fieldReaders);
     }
 
     @Override
-    public BatchedReader struct(Types.StructType expected, GroupType struct,
+    public BatchedReader struct(
+        Types.StructType expected, GroupType struct,
         List<BatchedReader> fieldReaders) {
 
       // this works on struct fields and the root iceberg schema which itself is a struct.
@@ -137,9 +143,9 @@ public class VectorizedSparkParquetReaders {
       return new ColumnarBatchReader(types, expected, reorderedFields);
     }
 
-
     @Override
-    public BatchedReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+    public BatchedReader primitive(
+        org.apache.iceberg.types.Type.PrimitiveType expected,
         PrimitiveType primitive) {
 
       // Create arrow vector for this field
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index f29464a..5114db2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -55,11 +55,11 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.VectorReader;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.parquet.VectorReader;
 import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
@@ -127,7 +127,7 @@ class Reader implements DataSourceReader,
 
     } else {
 
-      this.numRecordsPerBatch = VectorReader.DEFAULT_NUM_ROWS_IN_BATCH;
+      this.numRecordsPerBatch = VectorReader.DEFAULT_BATCH_SIZE;
     }
     LOG.info("=> Set Config numRecordsPerBatch = {}", numRecordsPerBatch);
 
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
index 31f6231..d833001 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
@@ -26,7 +26,6 @@ import org.apache.iceberg.types.Types.ListType;
 import org.apache.iceberg.types.Types.LongType;
 import org.apache.iceberg.types.Types.MapType;
 import org.apache.iceberg.types.Types.StructType;
-import org.apache.spark.sql.types.Decimal;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -40,31 +39,23 @@ public abstract class AvroDataTest {
 
   protected static final StructType SUPPORTED_PRIMITIVES = StructType.of(
 
-          required(100, "id", LongType.get()),
-//          optional(101, "data", Types.StringType.get()),
-//          required(102, "b", Types.BooleanType.get()),
-//          optional(103, "i", Types.IntegerType.get()),
-//          required(104, "l", LongType.get()),
-//          optional(105, "f", Types.FloatType.get()),
-//          required(106, "d", Types.DoubleType.get()),
-//          optional(107, "date", Types.DateType.get()),
-//          required(108, "ts", Types.TimestampType.withZone()),
-//          required(110, "s", Types.StringType.get()),
-//          //required(111, "uuid", Types.UUIDType.get()),
-//          required(112, "fixed", Types.FixedType.ofLength(7)),
-//          optional(113, "bytes", Types.BinaryType.get()),
-//          required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
-//          required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
-//          required(116, "dec_38_10", Types.DecimalType.of(38, 10)),
-//          required(117, "dec_38_0", Types.DecimalType.of(38, 0)));
-
-        required(113, "dec_28_0", Types.DecimalType.of(28, 0)),
-          required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
-          required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
-          required(116, "dec_38_10", Types.DecimalType.of(38, 10)),
-          required(117, "dec_38_0", Types.DecimalType.of(38, 0)),
-          required(118, "dec_9_5", Types.DecimalType.of(9, 5)),
-        required(119, "dec_20_5", Types.DecimalType.of(20, 5)));
+         required(100, "id", LongType.get()),
+         required(101, "data", Types.StringType.get()),
+         required(102, "b", Types.BooleanType.get()),
+         optional(103, "i", Types.IntegerType.get()),
+         required(104, "l", LongType.get()),
+         optional(105, "f", Types.FloatType.get()),
+         optional(106, "d", Types.DoubleType.get()),
+         optional(107, "date", Types.DateType.get()),
+         optional(108, "ts", Types.TimestampType.withZone()),
+         optional(110, "s", Types.StringType.get()),
+         // //required(111, "uuid", Types.UUIDType.get()),
+         optional(112, "fixed", Types.FixedType.ofLength(7)),
+         optional(113, "bytes", Types.BinaryType.get()),
+         required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
+         required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
+         optional(116, "dec_38_10", Types.DecimalType.of(38, 10)),
+         required(117, "dec_38_0", Types.DecimalType.of(38, 0)));
 
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 100d89a..ba60634 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -224,8 +224,7 @@ public class TestHelpers {
         //     + " , type:"+fieldType.typeId()
         //     + " , expected:"+expectedValue);
         if (actualRow.isNullAt(i)) {
-
-          Assert.assertTrue("Expect null at " + r , expectedValue == null);
+          Assert.assertTrue("Expect null at " + r, expectedValue == null);
         } else {
           Object actualValue = actualRow.get(i, convert(fieldType));
           assertEqualsUnsafe(fieldType, expectedValue, actualValue);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
index 6c4b21b..4c3786c 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
@@ -19,28 +19,9 @@
 
 package org.apache.iceberg.spark.data;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.parquet.ParquetAvroValueReaders;
-import org.apache.iceberg.parquet.ParquetSchemaUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.parquet.schema.MessageType;
-import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Rule;
-import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-
 public class TestParquetAvroReader {
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
index 8653640..c234caa 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
@@ -19,21 +19,9 @@
 
 package org.apache.iceberg.spark.data;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.Iterator;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.parquet.ParquetAvroValueReaders;
-import org.apache.iceberg.parquet.ParquetAvroWriter;
-import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.types.Types;
-import org.apache.parquet.schema.MessageType;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
index ce7cbf0..ace908e 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
@@ -19,23 +19,8 @@
 
 package org.apache.iceberg.spark.data;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.avro.generic.GenericData;
-import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.junit.Assert;
-import org.junit.Assume;
-
-import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
 
 public class TestSparkParquetReader extends AvroDataTest {
   @Override
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
index f8f080c..06909c5 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
@@ -45,7 +45,8 @@ public class TestSparkParquetVectorizedReader extends AvroDataTest {
 
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
-
+    System.setProperty("arrow.enable_unsafe_memory_access", "true");
+    System.setProperty("arrow.enable_null_check_for_get", "false");
     // Write test data
     Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
         type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
index ba352de..01d9d01 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
@@ -19,19 +19,9 @@
 
 package org.apache.iceberg.spark.data;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.Types;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.junit.Assert;
 import org.junit.Rule;
-import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
diff --git a/versions.lock b/versions.lock
index 2f314a2..268c500 100644
--- a/versions.lock
+++ b/versions.lock
@@ -7,11 +7,11 @@ com.carrotsearch:hppc:0.7.2 (1 constraints: f70cda14)
 com.clearspring.analytics:stream:2.7.0 (1 constraints: 1a0dd136)
 com.esotericsoftware:kryo-shaded:4.0.2 (2 constraints: b71345a6)
 com.esotericsoftware:minlog:1.3.0 (1 constraints: 670e7c4f)
-com.fasterxml.jackson.core:jackson-annotations:2.7.9 (5 constraints: f154e19f)
-com.fasterxml.jackson.core:jackson-core:2.7.9 (5 constraints: d748db55)
-com.fasterxml.jackson.core:jackson-databind:2.7.9 (9 constraints: a688bc53)
-com.fasterxml.jackson.module:jackson-module-paranamer:2.7.9 (1 constraints: e0154200)
-com.fasterxml.jackson.module:jackson-module-scala_2.11:2.7.9 (1 constraints: 7f0da251)
+com.fasterxml.jackson.core:jackson-annotations:2.9.8 (5 constraints: f5545da2)
+com.fasterxml.jackson.core:jackson-core:2.9.8 (5 constraints: da48f956)
+com.fasterxml.jackson.core:jackson-databind:2.9.8 (9 constraints: a988d456)
+com.fasterxml.jackson.module:jackson-module-paranamer:2.9.8 (1 constraints: e1154700)
+com.fasterxml.jackson.module:jackson-module-scala_2.11:2.9.8 (1 constraints: 7f0da251)
 com.github.ben-manes.caffeine:caffeine:2.7.0 (1 constraints: 0b050a36)
 com.github.luben:zstd-jni:1.3.2-2 (1 constraints: 760d7c51)
 com.google.code.findbugs:jsr305:3.0.2 (9 constraints: d276cf3c)
@@ -80,7 +80,7 @@ javax.ws.rs:javax.ws.rs-api:2.0.1 (5 constraints: 6e649355)
 javax.xml.bind:jaxb-api:2.2.11 (6 constraints: a069fd48)
 javolution:javolution:5.5.1 (1 constraints: f00d1a43)
 jline:jline:2.12 (3 constraints: 98208776)
-joda-time:joda-time:2.9.9 (5 constraints: c2326fe6)
+joda-time:joda-time:2.9.9 (4 constraints: c125a2fb)
 log4j:apache-log4j-extras:1.2.17 (4 constraints: 3f36b1af)
 log4j:log4j:1.2.17 (12 constraints: 22ab5529)
 net.hydromatic:eigenbase-properties:1.1.5 (1 constraints: 5f0daf2c)
@@ -95,9 +95,9 @@ org.antlr:antlr4-runtime:4.7 (1 constraints: 7a0e125f)
 org.antlr:stringtemplate:3.2.1 (1 constraints: c10a3bc6)
 org.apache.ant:ant:1.9.1 (3 constraints: a721ed14)
 org.apache.ant:ant-launcher:1.9.1 (1 constraints: 69082485)
-org.apache.arrow:arrow-format:0.12.0 (1 constraints: 210ded21)
-org.apache.arrow:arrow-memory:0.12.0 (1 constraints: 210ded21)
-org.apache.arrow:arrow-vector:0.12.0 (2 constraints: 1d122345)
+org.apache.arrow:arrow-format:0.14.1 (1 constraints: 240df421)
+org.apache.arrow:arrow-memory:0.14.1 (1 constraints: 240df421)
+org.apache.arrow:arrow-vector:0.14.1 (2 constraints: 2012a545)
 org.apache.avro:avro:1.8.2 (5 constraints: 083cf387)
 org.apache.avro:avro-ipc:1.8.2 (1 constraints: f90b5bf4)
 org.apache.avro:avro-mapred:1.8.2 (2 constraints: 3a1a4787)
@@ -216,8 +216,8 @@ org.mortbay.jetty:servlet-api:2.5-20081211 (1 constraints: 390cbd19)
 org.mortbay.jetty:servlet-api-2.5:6.1.14 (2 constraints: e51482f7)
 org.objenesis:objenesis:2.5.1 (2 constraints: 19198bcb)
 org.roaringbitmap:RoaringBitmap:0.5.11 (1 constraints: 480d0a44)
-org.scala-lang:scala-library:2.11.12 (11 constraints: 319b89e7)
-org.scala-lang:scala-reflect:2.11.12 (2 constraints: 3d25c8bc)
+org.scala-lang:scala-library:2.11.12 (11 constraints: 5c9bfe44)
+org.scala-lang:scala-reflect:2.11.12 (1 constraints: 340fb09a)
 org.scala-lang.modules:scala-parser-combinators_2.11:1.1.0 (1 constraints: cf0e717c)
 org.scala-lang.modules:scala-xml_2.11:1.0.6 (1 constraints: 080b84e9)
 org.slf4j:jcl-over-slf4j:1.7.16 (1 constraints: 500d1d44)
diff --git a/versions.props b/versions.props
index 80c334e..e418732 100644
--- a/versions.props
+++ b/versions.props
@@ -1,7 +1,7 @@
 org.slf4j:slf4j-api = 1.7.5
 com.google.guava:guava = 28.0-jre
 org.apache.avro:avro = 1.8.2
-org.apache.arrow:arrow-vector = 0.12.0
+org.apache.arrow:arrow-vector = 0.14.1
 org.apache.hadoop:* = 2.7.3
 org.apache.hive:hive-standalone-metastore = 1.2.1
 org.apache.orc:orc-core = 1.5.5