You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/20 23:42:04 UTC

[incubator-iceberg] branch vectorized-read updated: Clean up code, rename classes, add unit tests for dictionary encoded data (#664)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


View the commit online:
https://github.com/apache/incubator-iceberg/commit/25c7e4a4a8d3d57cfc76dcb45ea4434f0ffc9895

The following commit(s) were added to refs/heads/vectorized-read by this push:
     new 25c7e4a  Clean up code, rename classes, add unit tests for dictionary encoded data (#664)
25c7e4a is described below

commit 25c7e4a4a8d3d57cfc76dcb45ea4434f0ffc9895
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Wed Nov 20 15:41:56 2019 -0800

    Clean up code, rename classes, add unit tests for dictionary encoded data (#664)
---
 .../org/apache/iceberg/parquet/BatchedReader.java  |    7 -
 .../java/org/apache/iceberg/parquet/Parquet.java   |    5 +-
 .../org/apache/iceberg/parquet/ParquetReader.java  |   17 +-
 .../iceberg/parquet/VectorizedValuesReader.java    | 1304 --------------------
 .../parquet/arrow/IcebergArrowColumnVector.java    |   22 +-
 .../parquet/arrow/IcebergDecimalArrowVector.java   |   21 +-
 .../parquet/arrow/IcebergVarBinaryArrowVector.java |    3 +-
 .../parquet/arrow/IcebergVarcharArrowVector.java   |    5 +-
 .../ColumnarBatchReaders.java}                     |   31 +-
 .../{ => vectorized}/NullabilityHolder.java        |    2 +-
 .../iceberg/parquet/vectorized/VectorHolder.java   |   64 +
 .../VectorizedArrowReader.java}                    |   93 +-
 .../VectorizedColumnIterator.java}                 |   62 +-
 .../VectorizedPageIterator.java}                   |   62 +-
 .../vectorized/VectorizedParquetValuesReader.java  | 1233 ++++++++++++++++++
 .../VectorizedReader.java}                         |   40 +-
 ...ectorizedDictionaryEncodedStringsBenchmark.java |    7 +-
 ...edFallbackToPlainEncodingStringsBenchmark.java} |   60 +-
 .../data/vector/VectorizedSparkParquetReaders.java |  248 ++--
 .../org/apache/iceberg/spark/source/Reader.java    |    4 +-
 .../apache/iceberg/spark/data/AvroDataTest.java    |    6 +-
 .../apache/iceberg/spark/data/DictionaryData.java  |  289 +++++
 .../org/apache/iceberg/spark/data/TestHelpers.java |   33 +-
 ...kParquetDictionaryEncodedVectorizedReader.java} |   14 +-
 ...ackToDictionaryEncodingForVectorizedReader.java |    1 -
 .../data/TestSparkParquetVectorizedReader.java     |   23 +-
 26 files changed, 1938 insertions(+), 1718 deletions(-)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/BatchedReader.java
deleted file mode 100644
index 335636e..0000000
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedReader.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.iceberg.parquet;
-
-/**
- * Marker interface for vectorized Iceberg readers.
- */
-public interface BatchedReader {
-}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 5b8d53c..9d83ce8 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.vectorized.VectorizedReader;
 import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.avro.AvroReadSupport;
@@ -287,7 +288,7 @@ public class Parquet {
     private StructType sparkSchema = null;
     private Expression filter = null;
     private ReadSupport<?> readSupport = null;
-    private Function<MessageType, BatchedReader> readerFunc = null;
+    private Function<MessageType, VectorizedReader> readerFunc = null;
     private boolean filterRecords = true;
     private boolean caseSensitive = true;
     private Map<String, String> properties = Maps.newHashMap();
@@ -347,7 +348,7 @@ public class Parquet {
       return this;
     }
 
-    public ReadBuilder createReaderFunc(Function<MessageType, BatchedReader> readerFunc) {
+    public ReadBuilder createReaderFunc(Function<MessageType, VectorizedReader> readerFunc) {
       this.readerFunc = readerFunc;
       return this;
     }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 5a7ad4c..b525454 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.*;
 import java.util.function.Function;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
@@ -32,6 +31,8 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.vectorized.ColumnarBatchReaders;
+import org.apache.iceberg.parquet.vectorized.VectorizedReader;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
@@ -54,14 +55,14 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
   private final InputFile input;
   private final Schema expectedSchema;
   private final ParquetReadOptions options;
-  private final Function<MessageType, BatchedReader> readerFunc;
+  private final Function<MessageType, VectorizedReader> readerFunc;
   private final Expression filter;
   private final boolean reuseContainers;
   private final boolean caseSensitive;
   private static final Logger LOG = LoggerFactory.getLogger(ParquetReader.class);
 
   public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
-      Function<MessageType, BatchedReader> readerFunc,
+      Function<MessageType, VectorizedReader> readerFunc,
       Expression filter, boolean reuseContainers, boolean caseSensitive,
       StructType sparkSchema, int maxRecordsPerBatch) {
     this.input = input;
@@ -79,7 +80,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
     private final InputFile file;
     private final ParquetReadOptions options;
     private final MessageType projection;
-    private final ColumnarBatchReader model;
+    private final ColumnarBatchReaders model;
     private final List<BlockMetaData> rowGroups;
     private final boolean[] shouldSkip;
     private final long totalValues;
@@ -87,8 +88,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
 
     @SuppressWarnings("unchecked")
     ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
-        Function<MessageType, ColumnarBatchReader> readerFunc, boolean reuseContainers,
-        boolean caseSensitive) {
+             Function<MessageType, ColumnarBatchReaders> readerFunc, boolean reuseContainers,
+             boolean caseSensitive) {
       this.file = file;
       this.options = options;
       this.reader = newReader(file, options);
@@ -152,7 +153,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
       return newReader;
     }
 
-    ColumnarBatchReader model() {
+    ColumnarBatchReaders model() {
       return model;
     }
 
@@ -206,7 +207,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
   private static class FileIterator implements Iterator, Closeable {
     private final ParquetFileReader reader;
     private final boolean[] shouldSkip;
-    private final ColumnarBatchReader model;
+    private final ColumnarBatchReaders model;
     private final long totalValues;
     private final boolean reuseContainers;
     //private final List<BlockMetaData> blockMetaDataList;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java
deleted file mode 100644
index b7ebfed..0000000
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java
+++ /dev/null
@@ -1,1304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iceberg.parquet;
-
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.*;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.bitpacking.BytePacker;
-import org.apache.parquet.column.values.bitpacking.Packer;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * A values reader for Parquet's run-length encoded data. This is based off of the version in
- * parquet-mr with these changes:
- * - Supports the vectorized interface.
- * - Works on byte arrays(byte[]) instead of making byte streams.
- * <p>
- * This encoding is used in multiple places:
- * - Definition/Repetition levels
- * - Dictionary ids.
- */
-public final class VectorizedValuesReader extends ValuesReader {
-  // Current decoding mode. The encoded data contains groups of either run length encoded data
-  // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
-  // the number of values in the group.
-  // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
-  private enum MODE {
-    RLE,
-    PACKED
-  }
-
-  // Encoded data.
-  private ByteBufferInputStream in;
-
-  // bit/byte width of decoded data and utility to batch unpack them.
-  private int bitWidth;
-  private int bytesWidth;
-  private BytePacker packer;
-
-  // Current decoding mode and values
-  private MODE mode;
-  private int currentCount;
-  private int currentValue;
-
-  // Buffer of decoded values if the values are PACKED.
-  private int[] currentBuffer = new int[16];
-  private int currentBufferIdx = 0;
-
-  // If true, the bit width is fixed. This decoder is used in different places and this also
-  // controls if we need to read the bitwidth from the beginning of the data stream.
-  private final boolean fixedWidth;
-  private final boolean readLength;
-  private final int maxDefLevel;
-
-  public VectorizedValuesReader(int maxDefLevel) {
-    this.maxDefLevel = maxDefLevel;
-    this.fixedWidth = false;
-    this.readLength = false;
-  }
-
-  public VectorizedValuesReader(
-      int bitWidth,
-      int maxDefLevel) {
-    this.fixedWidth = true;
-    this.readLength = bitWidth != 0;
-    this.maxDefLevel = maxDefLevel;
-    init(bitWidth);
-  }
-
-  public VectorizedValuesReader(
-      int bitWidth,
-      boolean readLength,
-      int maxDefLevel) {
-    this.fixedWidth = true;
-    this.readLength = readLength;
-    this.maxDefLevel = maxDefLevel;
-    init(bitWidth);
-  }
-
-  @Override
-  public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
-    this.in = in;
-    if (fixedWidth) {
-      // initialize for repetition and definition levels
-      if (readLength) {
-        int length = readIntLittleEndian();
-        this.in = in.sliceStream(length);
-      }
-    } else {
-      // initialize for values
-      if (in.available() > 0) {
-        init(in.read());
-      }
-    }
-    if (bitWidth == 0) {
-      // 0 bit width, treat this as an RLE run of valueCount number of 0's.
-      this.mode = MODE.RLE;
-      this.currentCount = valueCount;
-      this.currentValue = 0;
-    } else {
-      this.currentCount = 0;
-    }
-  }
-
-
-
-  /**
-   * Initializes the internal state for decoding ints of `bitWidth`.
-   */
-  private void init(int bitWidth) {
-    Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
-    this.bitWidth = bitWidth;
-    this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
-    this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
-  }
-
-  /**
-   * Reads the next varint encoded int.
-   */
-  private int readUnsignedVarInt() throws IOException {
-    int value = 0;
-    int shift = 0;
-    int b;
-    do {
-      b = in.read();
-      value |= (b & 0x7F) << shift;
-      shift += 7;
-    } while ((b & 0x80) != 0);
-    return value;
-  }
-
-  /**
-   * Reads the next 4 byte little endian int.
-   */
-  private int readIntLittleEndian() throws IOException {
-    int ch4 = in.read();
-    int ch3 = in.read();
-    int ch2 = in.read();
-    int ch1 = in.read();
-    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-  }
-
-  /**
-   * Reads the next byteWidth little endian int.
-   */
-  private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
-    switch (bytesWidth) {
-      case 0:
-        return 0;
-      case 1:
-        return in.read();
-      case 2: {
-        int ch2 = in.read();
-        int ch1 = in.read();
-        return (ch1 << 8) + ch2;
-      }
-      case 3: {
-        int ch3 = in.read();
-        int ch2 = in.read();
-        int ch1 = in.read();
-        return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
-      }
-      case 4: {
-        return readIntLittleEndian();
-      }
-    }
-    throw new RuntimeException("Unreachable");
-  }
-
-  private int ceil8(int value) {
-    return (value + 7) / 8;
-  }
-
-  /**
-   * Reads the next group.
-   */
-  private void readNextGroup() {
-    try {
-      int header = readUnsignedVarInt();
-      this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
-      switch (mode) {
-        case RLE:
-          this.currentCount = header >>> 1;
-          this.currentValue = readIntLittleEndianPaddedOnBitWidth();
-          return;
-        case PACKED:
-          int numGroups = header >>> 1;
-          this.currentCount = numGroups * 8;
-
-          if (this.currentBuffer.length < this.currentCount) {
-            this.currentBuffer = new int[this.currentCount];
-          }
-          currentBufferIdx = 0;
-          int valueIndex = 0;
-          while (valueIndex < this.currentCount) {
-            // values are bit packed 8 at a time, so reading bitWidth will always work
-            ByteBuffer buffer = in.slice(bitWidth);
-            this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
-            valueIndex += 8;
-          }
-          return;
-        default:
-          throw new ParquetDecodingException("not a valid mode " + this.mode);
-      }
-    } catch (IOException e) {
-      throw new ParquetDecodingException("Failed to read from input stream", e);
-    }
-  }
-
-  @Override
-  public boolean readBoolean() {
-    return this.readInteger() != 0;
-  }
-
-  @Override
-  public void skip() {
-    this.readInteger();
-  }
-
-  @Override
-  public int readValueDictionaryId() {
-    return readInteger();
-  }
-
-  @Override
-  public int readInteger() {
-    if (this.currentCount == 0) {
-      this.readNextGroup();
-    }
-
-    this.currentCount--;
-    switch (mode) {
-      case RLE:
-        return this.currentValue;
-      case PACKED:
-        return this.currentBuffer[currentBufferIdx++];
-    }
-    throw new RuntimeException("Unreachable");
-  }
-
-  public void readBatchOfDictionaryIds(
-          final IntVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader dictionaryEncodedValuesReader) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, n);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              vector.set(idx, dictionaryEncodedValuesReader.readInteger());
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
-  // check definition level.
-  private void readDictionaryIdsInternal(final IntVector c, final int numValsInVector, final int numValuesToRead) {
-    int left = numValuesToRead;
-    int idx = numValsInVector;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            c.set(idx, currentValue);
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            c.set(idx, currentBuffer[currentBufferIdx]);
-            currentBufferIdx++;
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfIntegers(
-      final FieldVector vector, final int numValsInVector,
-      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    ArrowBuf validityBuffer = vector.getValidityBuffer();
-    ArrowBuf dataBuffer = vector.getDataBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          bufferIdx =
-              fillFixWidthValueBuffer(
-                  typeWidth,
-                  maxDefLevel,
-                  nullabilityHolder,
-                  valuesReader,
-                  bufferIdx,
-                  dataBuffer,
-                  n);
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
-              //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
-              dataBuffer.setInt(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth).getInt());
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-              bufferIdx++;
-            } else {
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedIntegers(
-          final FieldVector vector, final int numValsInVector,
-          final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedIntegersInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setInt(idx, dict.decodeToInt(currentValue));
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setInt(idx, dict.decodeToInt(currentBuffer[currentBufferIdx++]));
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfLongs(
-      final FieldVector vector, final int numValsInVector,
-      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    ArrowBuf validityBuffer = vector.getValidityBuffer();
-    ArrowBuf dataBuffer = vector.getDataBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          bufferIdx =
-              fillFixWidthValueBuffer(
-                  typeWidth,
-                  maxDefLevel,
-                  nullabilityHolder,
-                  valuesReader,
-                  bufferIdx,
-                  dataBuffer,
-                  n);
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
-              //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
-              dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-              bufferIdx++;
-            } else {
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedLongs(
-          final FieldVector vector, final int numValsInVector,
-          final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedLongsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentBuffer[currentBufferIdx++]));
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfFloats(
-      final FieldVector vector, final int numValsInVector,
-      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    ArrowBuf validityBuffer = vector.getValidityBuffer();
-    ArrowBuf dataBuffer = vector.getDataBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          bufferIdx =
-              fillFixWidthValueBuffer(
-                  typeWidth,
-                  maxDefLevel,
-                  nullabilityHolder,
-                  valuesReader,
-                  bufferIdx,
-                  dataBuffer,
-                  n);
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
-              //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
-              dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
-              //dataBuffer.setFloat(bufferIdx, valuesReader.getBuffer(typeWidth).getFloat());
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-              bufferIdx++;
-            } else {
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedFloats(
-          final FieldVector vector, final int numValsInVector,
-          final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedFloatsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentBuffer[currentBufferIdx++]));
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDoubles(
-      final FieldVector vector, final int numValsInVector,
-      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-      BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    ArrowBuf validityBuffer = vector.getValidityBuffer();
-    ArrowBuf dataBuffer = vector.getDataBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          bufferIdx =
-              fillFixWidthValueBuffer(
-                  typeWidth,
-                  maxDefLevel,
-                  nullabilityHolder,
-                  valuesReader,
-                  bufferIdx,
-                  dataBuffer,
-                  n);
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-              bufferIdx++;
-            } else {
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedDoubles(
-          final FieldVector vector, final int numValsInVector,
-          final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedDoublesInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentBuffer[currentBufferIdx++]));
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfFixedWidthBinary(
-      final FieldVector vector, final int numValsInVector,
-      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-      BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    ArrowBuf validityBuffer = vector.getValidityBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            // for (int i = 0; i < n; i++) {
-            //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-            //   validityBufferIdx++;
-            // }
-            for (int i = 0; i < n; i++) {
-              bufferIdx = setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
-            }
-          } else {
-            for (int i = 0; i < n; i++) {
-              //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              bufferIdx = setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
-            } else {
-              //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedFixedWidthBinary(
-          final FieldVector vector, final int numValsInVector,
-          final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentBuffer[currentBufferIdx++]).getBytesUnsafe());
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfFixedLengthDecimals(
-      final FieldVector vector, final int numValsInVector,
-      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-      BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    //ArrowBuf validityBuffer = vector.getValidityBuffer();
-    //ArrowBuf dataBuffer = vector.getDataBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            // for (int i = 0; i < n; i++) {
-            //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-            //   validityBufferIdx++;
-            // }
-            for (int i = 0; i < n; i++) {
-              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-              //bytesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-              ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
-              bufferIdx++;
-            }
-          } else {
-            for (int i = 0; i < n; i++) {
-              //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-              ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
-              bufferIdx++;
-            } else {
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedFixedLengthDecimals(
-          final FieldVector vector, final int numValsInVector,
-          final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
-              byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-              System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-              ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
-            byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
-            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-            ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
-            byte[] decimalBytes = dict.decodeToBinary(currentBuffer[currentBufferIdx++]).getBytesUnsafe();
-            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-            ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  /**
-   * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
-   * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
-   * vector. It appropriately sets the validity buffer in the Arrow vector.
-   */
-  public void readBatchVarWidth(
-      final FieldVector vector,
-      final int numValsInVector,
-      final int batchSize,
-      NullabilityHolder nullabilityHolder,
-      BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    ArrowBuf dataBuffer = vector.getDataBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            // for (int i = 0; i < n; i++) {
-            //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-            //   validityBufferIdx++;
-            // }
-            for (int i = 0; i < n; i++) {
-              int len = valuesReader.readInteger();
-              ByteBuffer buffer = valuesReader.getBuffer(len);
-              ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
-              dataBuffer.writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
-              bufferIdx++;
-            }
-          } else {
-            //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-            nullabilityHolder.setNulls(bufferIdx, n);
-            bufferIdx += n;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              int len = valuesReader.readInteger();
-              ByteBuffer buffer = valuesReader.getBuffer(len);
-              ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
-              dataBuffer.writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
-              bufferIdx++;
-            } else {
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedVarWidth(
-          final FieldVector vector, final int numValsInVector,
-          final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader dictionaryEncodedValuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              ((BaseVariableWidthVector)vector).setSafe(idx, dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(FieldVector vector, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            ((BaseVariableWidthVector)vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            ((BaseVariableWidthVector)vector).setSafe(idx, dict.decodeToBinary(currentBuffer[currentBufferIdx++]).getBytesUnsafe());
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfIntLongBackedDecimals(
-      final FieldVector vector, final int numValsInVector,
-      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-      BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    ArrowBuf validityBuffer = vector.getValidityBuffer();
-    ArrowBuf dataBuffer = vector.getDataBuffer();
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            for (int i = 0; i < n; i++) {
-              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-              dataBuffer.setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
-              bufferIdx++;
-            }
-          } else {
-            nullabilityHolder.setNulls(bufferIdx, n);
-            bufferIdx += n;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-              dataBuffer.setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
-              bufferIdx++;
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-            } else {
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
-          final FieldVector vector, final int numValsInVector,
-          final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
-    int idx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
-            idx += n;
-          } else {
-            for (int i = 0; i < n; i++) {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(valuesReader.readInteger()) : dict.decodeToLong(valuesReader.readInteger())));
-              idx++;
-            } else {
-              nullabilityHolder.setNull(idx);
-              idx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(FieldVector vector, final int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-    int left = numValuesToRead;
-    while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          for (int i = 0; i < n; i++) {
-            ((DecimalVector) vector).set(idx, typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
-            idx++;
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; i++) {
-            ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentBuffer[currentBufferIdx++])));
-            idx++;
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  public void readBatchOfBooleans(
-      final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-    int bufferIdx = numValsInVector;
-    int left = batchSize;
-    while (left > 0) {
-      if (this.currentCount == 0) {
-        this.readNextGroup();
-      }
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefLevel) {
-            for (int i = 0; i < n; i++) {
-              ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
-              bufferIdx++;
-            }
-          } else {
-            for (int i = 0; i < n; i++) {
-              ((BitVector) vector).setNull(bufferIdx);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
-              ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
-              bufferIdx++;
-            } else {
-              ((BitVector) vector).setNull(bufferIdx);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
-            }
-          }
-          break;
-      }
-      left -= n;
-      currentCount -= n;
-    }
-  }
-
-  private int setBinaryInVector(VarBinaryVector vector, int typeWidth, BytesReader valuesReader, int bufferIdx) {
-    byte[] byteArray = new byte[typeWidth];
-    valuesReader.getBuffer(typeWidth).get(byteArray);
-    vector.setSafe(bufferIdx, byteArray);
-    bufferIdx++;
-    return bufferIdx;
-  }
-
-  private int fillFixWidthValueBuffer(
-      int typeWidth, int maxDefLevel, NullabilityHolder nullabilityHolder,
-      BytesReader valuesReader, int bufferIdx, ArrowBuf dataBuffer, int n) {
-    if (currentValue == maxDefLevel) {
-      // for (int i = 0; i < n; i++) {
-      //   //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-      //   validityBufferIdx++;
-      // }
-      ByteBuffer buffer = valuesReader.getBuffer(n * typeWidth);
-      dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
-      bufferIdx += n;
-    } else {
-      for (int i = 0; i < n; i++) {
-        //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
-        nullabilityHolder.setNull(bufferIdx);
-        bufferIdx++;
-      }
-    }
-    return bufferIdx;
-  }
-
-}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
index a3c8460..7b30948 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.holders.NullableVarCharHolder;
-import org.apache.iceberg.parquet.NullabilityHolder;
-import org.apache.iceberg.parquet.ParquetUtil;
-import org.apache.iceberg.parquet.VectorReader;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.VectorHolder;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -58,7 +59,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
   private final boolean isVectorDictEncoded;
   private ArrowColumnVector[] childColumns;
 
-  public IcebergArrowColumnVector(VectorReader.VectorHolder holder, NullabilityHolder nulls) {
+  public IcebergArrowColumnVector(VectorHolder holder, NullabilityHolder nulls) {
     super(ArrowUtils.fromArrowField(holder.getVector().getField()));
     this.nullabilityHolder = nulls;
     this.columnDescriptor = holder.getDescriptor();
@@ -67,6 +68,11 @@ public class IcebergArrowColumnVector extends ColumnVector {
     this.accessor = getVectorAccessor(columnDescriptor, holder.getVector());
   }
 
+  @VisibleForTesting
+  public ArrowVectorAccessor getAccessor() {
+    return accessor;
+  }
+
   @Override
   public void close() {
     if (childColumns != null) {
@@ -169,7 +175,8 @@ public class IcebergArrowColumnVector extends ColumnVector {
   @Override
   public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
 
-  private abstract class ArrowVectorAccessor {
+  @VisibleForTesting
+  public abstract class ArrowVectorAccessor {
 
     private final ValueVector vector;
 
@@ -229,6 +236,11 @@ public class IcebergArrowColumnVector extends ColumnVector {
     ColumnarArray getArray(int rowId) {
       throw new UnsupportedOperationException();
     }
+
+    @VisibleForTesting
+    public ValueVector getUnderlyingArrowVector() {
+      return vector;
+    }
   }
 
   private ArrowVectorAccessor getVectorAccessor(ColumnDescriptor desc, ValueVector vector) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
index 1815414..9a833b0 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
@@ -1,8 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.DecimalVector;
-import org.apache.iceberg.parquet.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
 
 /**
  *
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
index 7115564..6d8922b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.iceberg.parquet.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
 
 /**
  * Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
index 2b7bc53..646ddd1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VarCharVector;
-import org.apache.iceberg.parquet.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
 
 /**
- * Extension of Arrow's @{@link VarCharVector}. The whole reason of having
+ * Extension of Arrow's @{@link VarCharVector}. The reason of having
  * this implementation is to override the expensive {@link VarCharVector#isSet(int)} method.
  */
 public class IcebergVarcharArrowVector extends VarCharVector {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
similarity index 67%
rename from parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
index 7204278..64c78af 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
@@ -1,33 +1,34 @@
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
 
 import org.apache.arrow.vector.FieldVector;
 import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.schema.Type;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Array;
 import java.util.List;
 import java.util.Map;
 
-public class ColumnarBatchReader implements BatchedReader {
-    private static final Logger LOG = LoggerFactory.getLogger(ColumnarBatchReader.class);
-    private final VectorReader[] readers;
+/**
+ * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's
+ * vectorized read path. The {@link ColumnarBatch} returned is created by passing in the
+ * Arrow vectors populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}.
+ */
+public class ColumnarBatchReaders implements VectorizedReader {
+    private final VectorizedArrowReader[] readers;
 
-    public ColumnarBatchReader(List<Type> types,
-                               Types.StructType icebergExpectedFields,
-                               List<BatchedReader> readers) {
-        this.readers = (VectorReader[]) Array.newInstance(
-                VectorReader.class, readers.size());
+    public ColumnarBatchReaders(List<Type> types,
+                                Types.StructType icebergExpectedFields,
+                                List<VectorizedReader> readers) {
+        this.readers = (VectorizedArrowReader[]) Array.newInstance(
+                VectorizedArrowReader.class, readers.size());
         int i = 0;
-        for (BatchedReader reader : readers) {
-            this.readers[i] = (VectorReader) reader;
+        for (VectorizedReader reader : readers) {
+            this.readers[i] = (VectorizedArrowReader) reader;
             i++;
         }
 
@@ -47,7 +48,7 @@ public class ColumnarBatchReader implements BatchedReader {
         int numRows = 0;
         for (int i = 0; i < readers.length; i += 1) {
             NullabilityHolder nullabilityHolder = new NullabilityHolder(readers[i].batchSize());
-            VectorReader.VectorHolder holder = readers[i].read(nullabilityHolder);
+            VectorHolder holder = readers[i].read(nullabilityHolder);
             FieldVector vector = holder.getVector();
             icebergArrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
             if (i > 0 && numRows != vector.getValueCount()) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
similarity index 96%
copy from parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
copy to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
index dbffdb0..70b84d9 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
 
 public class NullabilityHolder {
   private final boolean[] isNull;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
new file mode 100644
index 0000000..f11c968
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container class for holding the Arrow vector holding a batch of values
+ * along with other state needed for reading values out of it.
+ */
+public class VectorHolder {
+    private final ColumnDescriptor columnDescriptor;
+    private final FieldVector vector;
+    private final boolean isDictionaryEncoded;
+
+    @Nullable
+    private final Dictionary dictionary;
+
+
+    public VectorHolder(ColumnDescriptor columnDescriptor, FieldVector vector, boolean isDictionaryEncoded, Dictionary dictionary) {
+        this.columnDescriptor = columnDescriptor;
+        this.vector = vector;
+        this.isDictionaryEncoded = isDictionaryEncoded;
+        this.dictionary = dictionary;
+    }
+
+    public ColumnDescriptor getDescriptor() {
+        return columnDescriptor;
+    }
+
+    public FieldVector getVector() {
+        return vector;
+    }
+
+    public boolean isDictionaryEncoded() {
+        return isDictionaryEncoded;
+    }
+
+    public Dictionary getDictionary() {
+        return dictionary;
+    }
+
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
similarity index 75%
rename from parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
index 661097f..f44c5dc 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.*;
@@ -25,6 +25,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.iceberg.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergDecimalArrowVector;
 import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarBinaryArrowVector;
 import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarcharArrowVector;
@@ -37,31 +38,20 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.PrimitiveType;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 /***
- * Creates and allocates space for Arrow field vectors based on Iceberg data type mapped to Arrow type.
- * Iceberg to Arrow Type mapping :
- *   icebergType : LONG       -   Field Vector Type : org.apache.arrow.vector.BigIntVector
- *   icebergType : STRING     -   Field Vector Type : org.apache.arrow.vector.VarCharVector
- *   icebergType : BOOLEAN    -   Field Vector Type : org.apache.arrow.vector.BitVector
- *   icebergType : INTEGER    -   Field Vector Type : org.apache.arrow.vector.IntVector
- *   icebergType : FLOAT      -   Field Vector Type : org.apache.arrow.vector.Float4Vector
- *   icebergType : DOUBLE     -   Field Vector Type : org.apache.arrow.vector.Float8Vector
- *   icebergType : DATE       -   Field Vector Type : org.apache.arrow.vector.DateDayVector
- *   icebergType : TIMESTAMP  -   Field Vector Type : org.apache.arrow.vector.TimeStampMicroTZVector
- *   icebergType : STRING     -   Field Vector Type : org.apache.arrow.vector.VarCharVector
- *   icebergType : BINARY     -   Field Vector Type : org.apache.arrow.vector.VarBinaryVector
- *   icebergField : DECIMAL   -   Field Vector Type : org.apache.arrow.vector.DecimalVector
+ * {@link VectorizedReader VectorReader(s)} that read in a batch of values into Arrow vectors.
+ * It also takes care of allocating the right kind of Arrow vectors depending on the corresponding
+ * Iceberg/Parquet data types.
  */
-public class VectorReader implements BatchedReader {
+public class VectorizedArrowReader implements VectorizedReader {
   public static final int DEFAULT_BATCH_SIZE = 5000;
   public static final int UNKNOWN_WIDTH = -1;
 
   private final ColumnDescriptor columnDescriptor;
   private final int batchSize;
-  private final BatchedColumnIterator batchedColumnIterator;
+  private final VectorizedColumnIterator vectorizedColumnIterator;
   private final boolean isFixedLengthDecimal;
   private final boolean isVarWidthType;
   private final boolean isFixedWidthBinary;
@@ -86,7 +76,7 @@ public class VectorReader implements BatchedReader {
   // this value if Arrow ends up changing this default.
   private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
 
-  public VectorReader(
+  public VectorizedArrowReader(
       ColumnDescriptor desc,
       Types.NestedField icebergField,
       BufferAllocator rootAlloc,
@@ -104,7 +94,7 @@ public class VectorReader implements BatchedReader {
     this.isLongType = ParquetUtil.isLongType(desc);
     this.isFloatType = ParquetUtil.isFloatType(desc);
     this.isDoubleType = ParquetUtil.isDoubleType(desc);
-    this.batchedColumnIterator = new BatchedColumnIterator(desc, "", batchSize);
+    this.vectorizedColumnIterator = new VectorizedColumnIterator(desc, "", batchSize);
   }
 
   public VectorHolder read(NullabilityHolder nullabilityHolder) {
@@ -112,35 +102,39 @@ public class VectorReader implements BatchedReader {
       typeWidth = allocateFieldVector(rootAlloc, icebergField, columnDescriptor);
     }
     vec.setValueCount(0);
-    if (batchedColumnIterator.hasNext()) {
+    if (vectorizedColumnIterator.hasNext()) {
       if (allPagesDictEncoded) {
-        batchedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
+        vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
       } else {
         if (isFixedLengthDecimal) {
-          batchedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
           ((IcebergDecimalArrowVector) vec).setNullabilityHolder(nullabilityHolder);
         } else if (isFixedWidthBinary) {
-          batchedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth, nullabilityHolder);
+          // Fixed width binary type values are stored in an IcebergVarBinaryArrowVector as well
+          if (vec instanceof IcebergVarBinaryArrowVector) {
+            ((IcebergVarBinaryArrowVector) vec).setNullabilityHolder(nullabilityHolder);
+          }
+          vectorizedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth, nullabilityHolder);
         } else if (isVarWidthType) {
           if (vec instanceof IcebergVarcharArrowVector) {
             ((IcebergVarcharArrowVector) vec).setNullabilityHolder(nullabilityHolder);
           } else if (vec instanceof IcebergVarBinaryArrowVector) {
             ((IcebergVarBinaryArrowVector) vec).setNullabilityHolder(nullabilityHolder);
           }
-          batchedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
         } else if (isBooleanType) {
-          batchedColumnIterator.nextBatchBoolean(vec, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchBoolean(vec, nullabilityHolder);
         } else if (isPaddedDecimal) {
           ((IcebergDecimalArrowVector) vec).setNullabilityHolder(nullabilityHolder);
-          batchedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
         } else if (isIntType) {
-          batchedColumnIterator.nextBatchIntegers(vec, typeWidth, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchIntegers(vec, typeWidth, nullabilityHolder);
         } else if (isLongType) {
-          batchedColumnIterator.nextBatchLongs(vec, typeWidth, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchLongs(vec, typeWidth, nullabilityHolder);
         } else if (isFloatType) {
-          batchedColumnIterator.nextBatchFloats(vec, typeWidth, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchFloats(vec, typeWidth, nullabilityHolder);
         } else if (isDoubleType) {
-          batchedColumnIterator.nextBatchDoubles(vec, typeWidth, nullabilityHolder);
+          vectorizedColumnIterator.nextBatchDoubles(vec, typeWidth, nullabilityHolder);
         }
       }
     }
@@ -252,7 +246,7 @@ public class VectorReader implements BatchedReader {
                               DictionaryPageReadStore dictionaryPageReadStore,
                               Map<ColumnPath, Boolean> columnDictEncoded) {
     allPagesDictEncoded = columnDictEncoded.get(ColumnPath.get(columnDescriptor.getPath()));
-    dictionary = batchedColumnIterator.setRowGroupInfo(source, dictionaryPageReadStore, allPagesDictEncoded);
+    dictionary = vectorizedColumnIterator.setRowGroupInfo(source, dictionaryPageReadStore, allPagesDictEncoded);
   }
 
   @Override
@@ -264,42 +258,5 @@ public class VectorReader implements BatchedReader {
     return batchSize;
   }
 
-  public Types.NestedField getIcebergField() {
-    return icebergField;
-  }
-
-  public static class VectorHolder {
-    private final ColumnDescriptor columnDescriptor;
-    private final FieldVector vector;
-    private final boolean isDictionaryEncoded;
-
-    @Nullable
-    private final Dictionary dictionary;
-
-
-    public VectorHolder(ColumnDescriptor columnDescriptor, FieldVector vector, boolean isDictionaryEncoded, Dictionary dictionary) {
-      this.columnDescriptor = columnDescriptor;
-      this.vector = vector;
-      this.isDictionaryEncoded = isDictionaryEncoded;
-      this.dictionary = dictionary;
-    }
-
-    public ColumnDescriptor getDescriptor() {
-      return columnDescriptor;
-    }
-
-    public FieldVector getVector() {
-      return vector;
-    }
-
-    public boolean isDictionaryEncoded() {
-      return isDictionaryEncoded;
-    }
-
-    public Dictionary getDictionary() {
-      return dictionary;
-    }
-
-  }
 }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
similarity index 76%
rename from parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
index 0a6c28d..fafaec6 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
@@ -17,49 +17,49 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.page.*;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.io.ParquetDecodingException;
 
-public class BatchedColumnIterator {
+/**
+ * Vectorized version of the ColumnIterator that reads column values in data pages of a column
+ * in a row group in a batched fashion.
+ */
+public class VectorizedColumnIterator {
 
   private final ColumnDescriptor desc;
-  private final BatchedPageIterator batchedPageIterator;
+  private final VectorizedPageIterator vectorizedPageIterator;
 
   // state reset for each row group
-  private PageReader pageSource = null;
+  private PageReader columnPageReader = null;
   private long totalValuesCount = 0L;
   private long valuesRead = 0L;
   private long advanceNextPageCount = 0L;
   private final int batchSize;
-  private boolean shouldVectorBeDictionaryEncoded;
 
-  public BatchedColumnIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
+  public VectorizedColumnIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
     this.desc = desc;
     this.batchSize = batchSize;
-    this.batchedPageIterator = new BatchedPageIterator(desc, writerVersion, batchSize);
+    this.vectorizedPageIterator = new VectorizedPageIterator(desc, writerVersion, batchSize);
   }
 
   public Dictionary setRowGroupInfo(PageReadStore store,
                                     DictionaryPageReadStore dictionaryPageReadStore,
                                     boolean allPagesDictEncoded) {
-    this.pageSource = store.getPageReader(desc);
-    this.totalValuesCount = pageSource.getTotalValueCount();
+    this.columnPageReader = store.getPageReader(desc);
+    this.totalValuesCount = columnPageReader.getTotalValueCount();
     this.valuesRead = 0L;
     this.advanceNextPageCount = 0L;
-    this.batchedPageIterator.reset();
-    Dictionary dict = readDictionary(desc, dictionaryPageReadStore);
-    this.batchedPageIterator.setDictionary(dict, allPagesDictEncoded);
-    this.shouldVectorBeDictionaryEncoded = allPagesDictEncoded;
+    this.vectorizedPageIterator.reset();
+    Dictionary dict = readDictionaryForColumn(desc, dictionaryPageReadStore);
+    this.vectorizedPageIterator.setDictionaryForColumn(dict, allPagesDictEncoded);
     advance();
     return dict;
   }
@@ -67,11 +67,11 @@ public class BatchedColumnIterator {
   private void advance() {
     if (valuesRead >= advanceNextPageCount) {
       // A parquet page may be empty i.e. contains no values
-      while (!batchedPageIterator.hasNext()) {
-        DataPage page = pageSource.readPage();
+      while (!vectorizedPageIterator.hasNext()) {
+        DataPage page = columnPageReader.readPage();
         if (page != null) {
-          batchedPageIterator.setPage(page);
-          this.advanceNextPageCount += batchedPageIterator.currentPageCount();
+          vectorizedPageIterator.setPage(page);
+          this.advanceNextPageCount += vectorizedPageIterator.currentPageCount();
         } else {
           return;
         }
@@ -90,7 +90,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, typeWidth, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -105,7 +105,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchDictionaryIds(vector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchDictionaryIds(vector, batchSize - rowsReadSoFar,
               rowsReadSoFar, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -120,7 +120,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchLongs(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchLongs(fieldVector, batchSize - rowsReadSoFar,
           rowsReadSoFar, typeWidth, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -135,7 +135,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchFloats(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchFloats(fieldVector, batchSize - rowsReadSoFar,
           rowsReadSoFar, typeWidth, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -150,7 +150,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchDoubles(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchDoubles(fieldVector, batchSize - rowsReadSoFar,
           rowsReadSoFar, typeWidth, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -165,7 +165,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -180,7 +180,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -195,7 +195,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -210,7 +210,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -225,7 +225,7 @@ public class BatchedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = batchedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch = vectorizedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -233,8 +233,8 @@ public class BatchedColumnIterator {
     }
   }
 
-  private Dictionary readDictionary(ColumnDescriptor desc,
-                                    DictionaryPageReadStore dictionaryPageReadStore) {
+  private Dictionary readDictionaryForColumn(ColumnDescriptor desc,
+                                             DictionaryPageReadStore dictionaryPageReadStore) {
     if (dictionaryPageReadStore == null) {
       return null;
     }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
similarity index 85%
rename from parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
index 99ae8e1..618be17 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
@@ -17,13 +17,14 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
 
 import com.google.common.base.Preconditions;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.iceberg.parquet.BytesReader;
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
@@ -43,19 +44,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-import static java.lang.String.format;
 import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
 
-public class BatchedPageIterator {
-    private static final Logger LOG = LoggerFactory.getLogger(BatchedPageIterator.class);
-    private VectorizedValuesReader definitionLevelReader;
-    private boolean eagerDecodeDictionary;
-    private final int batchSize;
-
-    public BatchedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
+public class VectorizedPageIterator {
+    private static final Logger LOG = LoggerFactory.getLogger(VectorizedPageIterator.class);
+    public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
         this.desc = desc;
         this.writerVersion = writerVersion;
-        this.batchSize = batchSize;
     }
 
     private final ColumnDescriptor desc;
@@ -64,18 +59,20 @@ public class BatchedPageIterator {
     // iterator state
     private boolean hasNext = false;
     private int triplesRead = 0;
-    private int currentDL = 0;
-    private int currentRL = 0;
 
     // page bookkeeping
     private Dictionary dict = null;
     private DataPage page = null;
     private int triplesCount = 0;
-    private Encoding valueEncoding = null;
-    private IntIterator definitionLevels = null;
+
+    // Needed once we add support for complex types. Unused for now.
     private IntIterator repetitionLevels = null;
+    private int currentRL = 0;
+
+    private VectorizedParquetValuesReader definitionLevelReader;
+    private boolean eagerDecodeDictionary;
     private BytesReader plainValuesReader = null;
-    private VectorizedValuesReader dictionaryEncodedValuesReader = null;
+    private VectorizedParquetValuesReader dictionaryEncodedValuesReader = null;
     private boolean allPagesDictEncoded;
 
     public void setPage(DataPage page) {
@@ -98,10 +95,8 @@ public class BatchedPageIterator {
     }
 
     // Dictionary is set per row group
-    public void setDictionary(Dictionary dict, boolean allPagesDictEncoded) {
+    public void setDictionaryForColumn(Dictionary dict, boolean allPagesDictEncoded) {
         this.dict = dict;
-        // if all pages are not dictionary encoded, then we eagerly decode the
-        // dictionary encoded data before writing them to the vectors.
         this.allPagesDictEncoded = allPagesDictEncoded;
     }
 
@@ -109,9 +104,9 @@ public class BatchedPageIterator {
         this.page = null;
         this.triplesCount = 0;
         this.triplesRead = 0;
-        this.definitionLevels = null;
         this.repetitionLevels = null;
         this.plainValuesReader = null;
+        this.definitionLevelReader = null;
         this.hasNext = false;
     }
 
@@ -124,7 +119,7 @@ public class BatchedPageIterator {
     }
 
     /**
-     * Method for reading a batch of dictionary ids. Like definition levels, dictionary ids in Parquet are RLE
+     * Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels, dictionary ids in Parquet are RLE
      * encoded as well.
      */
     public int nextBatchDictionaryIds(final IntVector vector, final int expectedBatchSize,
@@ -141,9 +136,7 @@ public class BatchedPageIterator {
     }
 
     /**
-     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
-     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
-     * vector. It appropriately sets the validity buffer in the Arrow vector.
+     * Method for reading a batch of values of INT32 data type
      */
     public int nextBatchIntegers(final FieldVector vector, final int expectedBatchSize,
                                  final int numValsInVector,
@@ -163,9 +156,7 @@ public class BatchedPageIterator {
     }
 
     /**
-     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
-     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
-     * vector. It appropriately sets the validity buffer in the Arrow vector.
+     * Method for reading a batch of values of INT64 data type
      */
     public int nextBatchLongs(final FieldVector vector, final int expectedBatchSize,
                               final int numValsInVector,
@@ -185,9 +176,7 @@ public class BatchedPageIterator {
     }
 
     /**
-     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
-     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
-     * vector. It appropriately sets the validity buffer in the Arrow vector.
+     * Method for reading a batch of values of FLOAT data type.
      */
     public int nextBatchFloats(final FieldVector vector, final int expectedBatchSize,
                                final int numValsInVector,
@@ -207,9 +196,7 @@ public class BatchedPageIterator {
     }
 
     /**
-     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
-     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
-     * vector. It appropriately sets the validity buffer in the Arrow vector.
+     * Method for reading a batch of values of DOUBLE data type
      */
     public int nextBatchDoubles(final FieldVector vector, final int expectedBatchSize,
                                 final int numValsInVector,
@@ -234,7 +221,7 @@ public class BatchedPageIterator {
 
     /**
      * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
-     * Arrow stores all decimals in 16 bytes. This method provides the necessary padding to the decimals read.
+     * Since Arrow stores all decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
      */
     public int nextBatchIntLongBackedDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
                                              final int typeWidth, NullabilityHolder nullabilityHolder) {
@@ -341,7 +328,6 @@ public class BatchedPageIterator {
 
     private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
         ValuesReader previousReader = plainValuesReader;
-        this.valueEncoding = dataEncoding;
         this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dict != null && !allPagesDictEncoded;
         if (dataEncoding.usesDictionary()) {
             if (dict == null) {
@@ -349,7 +335,7 @@ public class BatchedPageIterator {
                         "could not read page in col " + desc + " as the dictionary was missing for encoding " + dataEncoding);
             }
             try {
-                dictionaryEncodedValuesReader = new VectorizedValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
+                dictionaryEncodedValuesReader = new VectorizedParquetValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
                 dictionaryEncodedValuesReader.initFromPage(valueCount, in);
             } catch (IOException e) {
                 throw new ParquetDecodingException("could not read page in col " + desc, e);
@@ -371,11 +357,10 @@ public class BatchedPageIterator {
         ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
         ValuesReader dlReader;
         int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
-        this.definitionLevelReader = new VectorizedValuesReader(
+        this.definitionLevelReader = new VectorizedParquetValuesReader(
                 bitWidth, desc.getMaxDefinitionLevel());
         dlReader = this.definitionLevelReader;
         this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
-        this.definitionLevels = new ValuesReaderIntIterator(definitionLevelReader);
         try {
             BytesInput bytes = page.getBytes();
             ByteBufferInputStream in = bytes.toInputStream();
@@ -390,12 +375,11 @@ public class BatchedPageIterator {
     private void initFromPage(DataPageV2 page) {
         this.triplesCount = page.getValueCount();
         this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), page.getRepetitionLevels());
-        this.definitionLevels = newRLEIterator(desc.getMaxDefinitionLevel(), page.getDefinitionLevels());
         LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
         try {
             int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
             initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
-            this.definitionLevelReader = new VectorizedValuesReader(bitWidth, false,
+            this.definitionLevelReader = new VectorizedParquetValuesReader(bitWidth, false,
                     desc.getMaxDefinitionLevel());
             definitionLevelReader.initFromPage(triplesCount, page.getDefinitionLevels().toInputStream());
         } catch (IOException e) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
new file mode 100644
index 0000000..5500352
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
@@ -0,0 +1,1233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.vector.*;
+import org.apache.iceberg.parquet.BytesReader;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A values reader for Parquet's run-length encoded data that reads column data in batches
+ * instead of one value at a time.
+ * This is based off of the version in Apache Spark with these changes:
+ * <p>
+ * <tr>Writes batches of values retrieved to Arrow vectors</tr>
+ * <tr>If all pages of a column within the row group are not dictionary encoded, then
+ * dictionary ids are eagerly decoded into actual values before writing them
+ * to the Arrow vectors</tr>
+ * </p>
+ */
+public final class VectorizedParquetValuesReader extends ValuesReader {
+
+    // Current decoding mode. The encoded data contains groups of either run length encoded data
+    // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+    // the number of values in the group.
+    private enum MODE {
+        RLE,
+        PACKED
+    }
+
+    // Encoded data.
+    private ByteBufferInputStream in;
+
+    // bit/byte width of decoded data and utility to batch unpack them.
+    private int bitWidth;
+    private int bytesWidth;
+    private BytePacker packer;
+
+    // Current decoding mode and values
+    private MODE mode;
+    private int currentCount;
+    private int currentValue;
+
+    // Buffer of decoded values if the values are PACKED.
+    private int[] packedValuesBuffer = new int[16];
+    private int packedValuesBufferIdx = 0;
+
+    // If true, the bit width is fixed. This decoder is used in different places and this also
+    // controls if we need to read the bitwidth from the beginning of the data stream.
+    private final boolean fixedWidth;
+    private final boolean readLength;
+    private final int maxDefLevel;
+
+    public VectorizedParquetValuesReader(int maxDefLevel) {
+        this.maxDefLevel = maxDefLevel;
+        this.fixedWidth = false;
+        this.readLength = false;
+    }
+
+    public VectorizedParquetValuesReader(
+            int bitWidth,
+            int maxDefLevel) {
+        this.fixedWidth = true;
+        this.readLength = bitWidth != 0;
+        this.maxDefLevel = maxDefLevel;
+        init(bitWidth);
+    }
+
+    public VectorizedParquetValuesReader(
+            int bitWidth,
+            boolean readLength,
+            int maxDefLevel) {
+        this.fixedWidth = true;
+        this.readLength = readLength;
+        this.maxDefLevel = maxDefLevel;
+        init(bitWidth);
+    }
+
+    @Override
+    public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+        this.in = in;
+        if (fixedWidth) {
+            // initialize for repetition and definition levels
+            if (readLength) {
+                int length = readIntLittleEndian();
+                this.in = in.sliceStream(length);
+            }
+        } else {
+            // initialize for values
+            if (in.available() > 0) {
+                init(in.read());
+            }
+        }
+        if (bitWidth == 0) {
+            // 0 bit width, treat this as an RLE run of valueCount number of 0's.
+            this.mode = MODE.RLE;
+            this.currentCount = valueCount;
+            this.currentValue = 0;
+        } else {
+            this.currentCount = 0;
+        }
+    }
+
+
+    /**
+     * Initializes the internal state for decoding ints of `bitWidth`.
+     */
+    private void init(int bitWidth) {
+        Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+        this.bitWidth = bitWidth;
+        this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
+        this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+    }
+
+    /**
+     * Reads the next varint encoded int.
+     */
+    private int readUnsignedVarInt() throws IOException {
+        int value = 0;
+        int shift = 0;
+        int b;
+        do {
+            b = in.read();
+            value |= (b & 0x7F) << shift;
+            shift += 7;
+        } while ((b & 0x80) != 0);
+        return value;
+    }
+
+    /**
+     * Reads the next 4 byte little endian int.
+     */
+    private int readIntLittleEndian() throws IOException {
+        int ch4 = in.read();
+        int ch3 = in.read();
+        int ch2 = in.read();
+        int ch1 = in.read();
+        return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+    }
+
+    /**
+     * Reads the next byteWidth little endian int.
+     */
+    private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
+        switch (bytesWidth) {
+            case 0:
+                return 0;
+            case 1:
+                return in.read();
+            case 2: {
+                int ch2 = in.read();
+                int ch1 = in.read();
+                return (ch1 << 8) + ch2;
+            }
+            case 3: {
+                int ch3 = in.read();
+                int ch2 = in.read();
+                int ch1 = in.read();
+                return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+            }
+            case 4: {
+                return readIntLittleEndian();
+            }
+        }
+        throw new RuntimeException("Unreachable");
+    }
+
+    /**
+     * Reads the next group.
+     */
+    private void readNextGroup() {
+        try {
+            int header = readUnsignedVarInt();
+            this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+            switch (mode) {
+                case RLE:
+                    this.currentCount = header >>> 1;
+                    this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+                    return;
+                case PACKED:
+                    int numGroups = header >>> 1;
+                    this.currentCount = numGroups * 8;
+
+                    if (this.packedValuesBuffer.length < this.currentCount) {
+                        this.packedValuesBuffer = new int[this.currentCount];
+                    }
+                    packedValuesBufferIdx = 0;
+                    int valueIndex = 0;
+                    while (valueIndex < this.currentCount) {
+                        // values are bit packed 8 at a time, so reading bitWidth will always work
+                        ByteBuffer buffer = in.slice(bitWidth);
+                        this.packer.unpack8Values(buffer, buffer.position(), this.packedValuesBuffer, valueIndex);
+                        valueIndex += 8;
+                    }
+                    return;
+                default:
+                    throw new ParquetDecodingException("not a valid mode " + this.mode);
+            }
+        } catch (IOException e) {
+            throw new ParquetDecodingException("Failed to read from input stream", e);
+        }
+    }
+
+    @Override
+    public boolean readBoolean() {
+        return this.readInteger() != 0;
+    }
+
+    @Override
+    public void skip() {
+        this.readInteger();
+    }
+
+    @Override
+    public int readValueDictionaryId() {
+        return readInteger();
+    }
+
+    @Override
+    public int readInteger() {
+        if (this.currentCount == 0) {
+            this.readNextGroup();
+        }
+
+        this.currentCount--;
+        switch (mode) {
+            case RLE:
+                return this.currentValue;
+            case PACKED:
+                return this.packedValuesBuffer[packedValuesBufferIdx++];
+        }
+        throw new RuntimeException("Unreachable");
+    }
+
+    public void readBatchOfDictionaryIds(
+            final IntVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, n);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            vector.set(idx, dictionaryEncodedValuesReader.readInteger());
+                        } else {
+                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
+    // check definition level.
+    private void readDictionaryIdsInternal(final IntVector c, final int numValsInVector, final int numValuesToRead) {
+        int left = numValuesToRead;
+        int idx = numValsInVector;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        c.set(idx, currentValue);
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        c.set(idx, packedValuesBuffer[packedValuesBufferIdx]);
+                        packedValuesBufferIdx++;
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfIntegers(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    setNextNValuesInVector(
+                            typeWidth,
+                            maxDefLevel,
+                            nullabilityHolder,
+                            valuesReader,
+                            bufferIdx,
+                            vector,
+                            n);
+                    bufferIdx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfDictionaryEncodedIntegers(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
+                        } else {
+                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedIntegersInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            ArrowBuf dataBuffer = vector.getDataBuffer();
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        dataBuffer.setInt(idx, dict.decodeToInt(currentValue));
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        dataBuffer.setInt(idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfLongs(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    setNextNValuesInVector(
+                            typeWidth,
+                            maxDefLevel,
+                            nullabilityHolder,
+                            valuesReader,
+                            bufferIdx,
+                            vector,
+                            n);
+                    bufferIdx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfDictionaryEncodedLongs(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            ArrowBuf validityBuffer = vector.getValidityBuffer();
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, validityBuffer);
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
+                        } else {
+                            setNull(nullabilityHolder, idx, validityBuffer);
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedLongsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfFloats(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    setNextNValuesInVector(
+                            typeWidth,
+                            maxDefLevel,
+                            nullabilityHolder,
+                            valuesReader,
+                            bufferIdx,
+                            vector,
+                            n);
+                    bufferIdx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void setValue(int typeWidth, BytesReader valuesReader, int bufferIdx, ArrowBuf validityBuffer, ArrowBuf dataBuffer) {
+        dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
+        BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx);
+        bufferIdx++;
+    }
+
+    public void readBatchOfDictionaryEncodedFloats(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            ArrowBuf validityBuffer = vector.getValidityBuffer();
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, validityBuffer);
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
+                        } else {
+                            setNull(nullabilityHolder, idx, validityBuffer);
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedFloatsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfDoubles(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+            BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    setNextNValuesInVector(
+                            typeWidth,
+                            maxDefLevel,
+                            nullabilityHolder,
+                            valuesReader,
+                            bufferIdx,
+                            vector,
+                            n);
+                    bufferIdx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfDictionaryEncodedDoubles(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
+                        } else {
+                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedDoublesInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfFixedWidthBinary(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+            BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        for (int i = 0; i < n; i++) {
+                            setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+                            bufferIdx++;
+                        }
+                    } else {
+                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+                        bufferIdx += n;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfDictionaryEncodedFixedWidthBinary(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
+                        } else {
+                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
+                        BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+                        BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfFixedLengthDecimals(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+            BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        for (int i = 0; i < n; i++) {
+                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+                            valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+                            ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+                            bufferIdx++;
+                        }
+                    } else {
+                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+                        bufferIdx += n;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+                            valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+                            ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfDictionaryEncodedFixedLengthDecimals(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
+                            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+                            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+                            ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+                        } else {
+                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
+                        byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
+                        byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+                        System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+                        ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
+                        byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
+                        byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+                        System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+                        ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    /**
+     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
+     * vector. It appropriately sets the validity buffer in the Arrow vector.
+     */
+    public void readBatchVarWidth(
+            final FieldVector vector,
+            final int numValsInVector,
+            final int batchSize,
+            NullabilityHolder nullabilityHolder,
+            BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        for (int i = 0; i < n; i++) {
+                            setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+                            bufferIdx++;
+                        }
+                    } else {
+                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+                        bufferIdx += n;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void setVarWidthBinaryValue(FieldVector vector, BytesReader valuesReader, int bufferIdx) {
+        int len = valuesReader.readInteger();
+        ByteBuffer buffer = valuesReader.getBuffer(len);
+        // Calling setValueLengthSafe takes care of allocating a larger buffer if
+        // running out of space.
+        ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
+        // It is possible that the data buffer was reallocated. So it is important to
+        // not cache the data buffer reference but instead use vector.getDataBuffer().
+        vector.getDataBuffer().writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
+        // Similarly, we need to get the latest reference to the validity buffer as well
+        // since reallocation changes reference of the validity buffers as well.
+        BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+    }
+
+    public void readBatchOfDictionaryEncodedVarWidth(
+            final FieldVector vector, final int numValsInVector,
+            final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
+                        } else {
+                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(FieldVector vector, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfIntLongBackedDecimals(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+            BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        for (int i = 0; i < n; i++) {
+                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+                            valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+                            vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+                            BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+                            bufferIdx++;
+                        }
+                    } else {
+                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+                        bufferIdx += n;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+                            valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+                            vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+                            bufferIdx++;
+                            //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+                        } else {
+                            //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+                            nullabilityHolder.setNull(bufferIdx);
+                            bufferIdx++;
+                        }
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
+            final FieldVector vector, final int numValsInVector,
+            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+        int idx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
+                    } else {
+                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+                    }
+                    idx += n;
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(valuesReader.readInteger()) : dict.decodeToLong(valuesReader.readInteger())));
+                        } else {
+                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+                        }
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(FieldVector vector, final int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+        int left = numValuesToRead;
+        while (left > 0) {
+            if (this.currentCount == 0) this.readNextGroup();
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    for (int i = 0; i < n; i++) {
+                        ((DecimalVector) vector).set(idx, typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
+                        idx++;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; i++) {
+                        ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++])));
+                        idx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    public void readBatchOfBooleans(
+            final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+        int bufferIdx = numValsInVector;
+        int left = batchSize;
+        while (left > 0) {
+            if (this.currentCount == 0) {
+                this.readNextGroup();
+            }
+            int n = Math.min(left, this.currentCount);
+            switch (mode) {
+                case RLE:
+                    if (currentValue == maxDefLevel) {
+                        for (int i = 0; i < n; i++) {
+                            ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+                            bufferIdx++;
+                        }
+                    } else {
+                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+                        bufferIdx += n;
+                    }
+                    break;
+                case PACKED:
+                    for (int i = 0; i < n; ++i) {
+                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+                            ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+                        } else {
+                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+                        }
+                        bufferIdx++;
+                    }
+                    break;
+            }
+            left -= n;
+            currentCount -= n;
+        }
+    }
+
+    private void setBinaryInVector(VarBinaryVector vector, int typeWidth, BytesReader valuesReader, int bufferIdx) {
+        byte[] byteArray = new byte[typeWidth];
+        valuesReader.getBuffer(typeWidth).get(byteArray);
+        vector.setSafe(bufferIdx, byteArray);
+    }
+
+    private void setNextNValuesInVector(
+            int typeWidth, int maxDefLevel, NullabilityHolder nullabilityHolder,
+            BytesReader valuesReader, int bufferIdx, FieldVector vector, int n) {
+        ArrowBuf validityBuffer = vector.getValidityBuffer();
+        int validityBufferIdx = bufferIdx;
+        if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+                BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+                validityBufferIdx++;
+            }
+            ByteBuffer buffer = valuesReader.getBuffer(n * typeWidth);
+            vector.getDataBuffer().setBytes(bufferIdx * typeWidth, buffer);
+        } else {
+            setNulls(nullabilityHolder, bufferIdx, n, validityBuffer);
+        }
+    }
+
+    private void setNull(NullabilityHolder nullabilityHolder, int bufferIdx, ArrowBuf validityBuffer) {
+        nullabilityHolder.setNull(bufferIdx);
+        BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+    }
+
+    private void setNulls(NullabilityHolder nullabilityHolder, int bufferIdx, int n, ArrowBuf validityBuffer) {
+        for (int i = 0; i < n; i++) {
+            nullabilityHolder.setNull(bufferIdx);
+            BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+            bufferIdx++;
+        }
+    }
+
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
similarity index 56%
rename from parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
index dbffdb0..bbaddca 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
@@ -16,41 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.parquet;
 
-public class NullabilityHolder {
-  private final boolean[] isNull;
-  private int numNulls;
+package org.apache.iceberg.parquet.vectorized;
 
-  public NullabilityHolder(int batchSize) {
-    this.isNull = new boolean[batchSize];
-  }
-
-
-  public void setNull(int idx) {
-    isNull[idx] =  true;
-    numNulls++;
-  }
-
-  public void setNulls(int idx, int num) {
-    int i = 0;
-    while (i < num) {
-      isNull[idx] = true;
-      numNulls++;
-      idx++;
-      i++;
-    }
-  }
-
-  public boolean isNullAt(int idx) {
-    return isNull[idx];
-  }
-
-  public boolean hasNulls() {
-    return numNulls > 0;
-  }
-
-  public int numNulls() {
-    return numNulls;
-  }
+/**
+ * Marker interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader {
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
index 1d4a4ed..f8fbe5e 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
@@ -11,7 +11,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
 import java.util.Map;
-import java.util.UUID;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.spark.sql.functions.*;
@@ -35,7 +34,7 @@ public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDicti
                     .withColumn("longCol",
                             when(pmod(col("id"), lit(9))
                                     .equalTo(lit(0)), lit(0l))
-                                    .when(expr("id > NUM_ROWS/2"), lit(UUID.randomUUID().toString()))
+                                    //.when(expr("id > NUM_ROWS/2"), lit(UUID.randomUUID().toString()))
                                     .when(pmod(col("id"), lit(9))
                                             .equalTo(lit(1)), lit(1l))
                                     .when(pmod(col("id"), lit(9))
@@ -72,8 +71,8 @@ public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDicti
                                     .when(col("longCol")
                                             .equalTo(lit(8)), lit("8"))
                                         .when(col("longCol")
-                                            .equalTo(lit(9)), lit("9"))
-                                    .otherwise(lit(UUID.randomUUID().toString())));
+                                            .equalTo(lit(9)), lit("9")));
+                                    //.otherwise(lit(UUID.randomUUID().toString())));
             appendAsFile(df);
         }
     }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedFallbackToPlainEncodingStringsBenchmark.java
similarity index 66%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedFallbackToPlainEncodingStringsBenchmark.java
index 1d4a4ed..27e8ce2 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedFallbackToPlainEncodingStringsBenchmark.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iceberg.spark.source.parquet.vectorized;
 
 import com.google.common.collect.Maps;
@@ -16,7 +35,7 @@ import java.util.UUID;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.spark.sql.functions.*;
 
-public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDictionaryEncodedBenchmark {
+public class VectorizedFallbackToPlainEncodingStringsBenchmark extends VectorizedDictionaryEncodedBenchmark {
     @Override
     protected final Table initTable() {
         Schema schema = new Schema(
@@ -33,48 +52,37 @@ public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDicti
         for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
             Dataset<Row> df = spark().range(NUM_ROWS)
                     .withColumn("longCol",
-                            when(pmod(col("id"), lit(9))
-                                    .equalTo(lit(0)), lit(0l))
-                                    .when(expr("id > NUM_ROWS/2"), lit(UUID.randomUUID().toString()))
+                            when(expr("id > 10000000/2"), lit(3l))
+                                    .when(pmod(col("id"), lit(9))
+                                            .equalTo(lit(0)), lit(1l))
                                     .when(pmod(col("id"), lit(9))
                                             .equalTo(lit(1)), lit(1l))
                                     .when(pmod(col("id"), lit(9))
-                                            .equalTo(lit(2)), lit(2l))
+                                            .equalTo(lit(2)), lit(1l))
                                     .when(pmod(col("id"), lit(9))
-                                            .equalTo(lit(3)), lit(3l))
+                                            .equalTo(lit(3)), lit(1l))
                                     .when(pmod(col("id"), lit(9))
-                                            .equalTo(lit(4)), lit(4l))
+                                            .equalTo(lit(4)), lit(1l))
                                     .when(pmod(col("id"), lit(9))
-                                            .equalTo(lit(5)), lit(5l))
+                                            .equalTo(lit(5)), lit(2l))
                                     .when(pmod(col("id"), lit(9))
-                                            .equalTo(lit(6)), lit(6l))
+                                            .equalTo(lit(6)), lit(2l))
                                     .when(pmod(col("id"), lit(9))
-                                            .equalTo(lit(7)), lit(7l))
+                                            .equalTo(lit(7)), lit(2l))
                                     .when(pmod(col("id"), lit(9))
-                                            .equalTo(lit(8)), lit(8l))
+                                            .equalTo(lit(8)), lit(2l))
                                     .otherwise(lit(2l)))
                     .drop("id")
                     .withColumn("stringCol",
                             when(col("longCol")
-                                    .equalTo(lit(1)), lit("1"))
-                                    .when(col("longCol")
-                                            .equalTo(lit(2)), lit("2"))
-                                    .when(col("longCol")
-                                            .equalTo(lit(3)), lit("3"))
+                                    .equalTo(lit(1l)), lit("1"))
                                     .when(col("longCol")
-                                            .equalTo(lit(4)), lit("4"))
+                                            .equalTo(lit(2l)), lit("2"))
                                     .when(col("longCol")
-                                            .equalTo(lit(5)), lit("5"))
-                                    .when(col("longCol")
-                                            .equalTo(lit(6)), lit("6"))
-                                    .when(col("longCol")
-                                            .equalTo(lit(7)), lit("7"))
-                                    .when(col("longCol")
-                                            .equalTo(lit(8)), lit("8"))
-                                        .when(col("longCol")
-                                            .equalTo(lit(9)), lit("9"))
+                                            .equalTo(lit(3l)), lit(UUID.randomUUID().toString()))
                                     .otherwise(lit(UUID.randomUUID().toString())));
             appendAsFile(df);
         }
     }
 }
+
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
index aad8602..daee625 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
@@ -22,16 +22,13 @@ package org.apache.iceberg.spark.data.vector;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.arrow.ArrowSchemaUtil;
-import org.apache.iceberg.parquet.BatchedReader;
-import org.apache.iceberg.parquet.ColumnarBatchReader;
+import org.apache.iceberg.parquet.vectorized.VectorizedReader;
+import org.apache.iceberg.parquet.vectorized.ColumnarBatchReaders;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.parquet.VectorReader;
+import org.apache.iceberg.parquet.vectorized.VectorizedArrowReader;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.schema.GroupType;
@@ -42,147 +39,134 @@ import org.apache.spark.sql.execution.arrow.ArrowUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 public class VectorizedSparkParquetReaders {
 
-  private static final Logger LOG = LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
-
-  private VectorizedSparkParquetReaders() {
-  }
-
-  @SuppressWarnings("unchecked")
-  public static ColumnarBatchReader buildReader(
-      Schema tableSchema,
-      Schema expectedSchema,
-      MessageType fileSchema) {
-
-    return buildReader(tableSchema, expectedSchema, fileSchema,
-        VectorReader.DEFAULT_BATCH_SIZE);
-  }
-
-  @SuppressWarnings("unchecked")
-  public static ColumnarBatchReader buildReader(
-      Schema tableSchema,
-      Schema expectedSchema,
-      MessageType fileSchema,
-      Integer recordsPerBatch) {
-
-    LOG.info("=> [VectorizedSparkParquetReaders] recordsPerBatch = {}", recordsPerBatch);
-    return (ColumnarBatchReader)
-        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-            new ReadBuilderBatched(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
-  }
-
-  private static class ReadBuilderBatched extends TypeWithSchemaVisitor<BatchedReader> {
-    private final MessageType parquetSchema;
-    private final Schema projectedIcebergSchema;
-    private final Schema tableIcebergSchema;
-    private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
-    private final BufferAllocator rootAllocator;
-    private final int recordsPerBatch;
-
-    ReadBuilderBatched(
-        Schema tableSchema,
-        Schema projectedIcebergSchema,
-        MessageType parquetSchema,
-        int recordsPerBatch) {
-      this.parquetSchema = parquetSchema;
-      this.tableIcebergSchema = tableSchema;
-      this.projectedIcebergSchema = projectedIcebergSchema;
-      this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
-      this.recordsPerBatch = recordsPerBatch;
-      this.rootAllocator = ArrowUtils.rootAllocator().newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
-      LOG.info("=> [ReadBuilder] recordsPerBatch = {}", this.recordsPerBatch);
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
 
-    @Override
-    public BatchedReader message(
-        Types.StructType expected, MessageType message,
-        List<BatchedReader> fieldReaders) {
-      return struct(expected, message.asGroupType(), fieldReaders);
+    private VectorizedSparkParquetReaders() {
     }
 
-    @Override
-    public BatchedReader struct(
-        Types.StructType expected, GroupType struct,
-        List<BatchedReader> fieldReaders) {
-
-      // this works on struct fields and the root iceberg schema which itself is a struct.
-
-      // match the expected struct's order
-      Map<Integer, BatchedReader> readersById = Maps.newHashMap();
-      Map<Integer, Type> typesById = Maps.newHashMap();
-      List<Type> fields = struct.getFields();
-
-      for (int i = 0; i < fields.size(); i += 1) {
-        Type fieldType = fields.get(i);
-        int id = fieldType.getId().intValue();
-        readersById.put(id, fieldReaders.get(i));
-        typesById.put(id, fieldType);
-      }
-
-      List<Types.NestedField> icebergFields = expected != null ?
-          expected.fields() : ImmutableList.of();
-
-      List<BatchedReader> reorderedFields = Lists.newArrayListWithExpectedSize(
-          icebergFields.size());
-
-      List<Type> types = Lists.newArrayListWithExpectedSize(icebergFields.size());
-
-      for (Types.NestedField field : icebergFields) {
-        int id = field.fieldId();
-        BatchedReader reader = readersById.get(id);
-        if (reader != null) {
-          reorderedFields.add(reader);
-          types.add(typesById.get(id));
-        } else {
-          reorderedFields.add(null); // anjali-todo We need a NullVectorReader
-          types.add(null);
-        }
-      }
-
-      return new ColumnarBatchReader(types, expected, reorderedFields);
+    @SuppressWarnings("unchecked")
+    public static ColumnarBatchReaders buildReader(
+            Schema tableSchema,
+            Schema expectedSchema,
+            MessageType fileSchema) {
+        return buildReader(tableSchema, expectedSchema, fileSchema,
+                VectorizedArrowReader.DEFAULT_BATCH_SIZE);
     }
 
-    @Override
-    public BatchedReader primitive(
-        org.apache.iceberg.types.Type.PrimitiveType expected,
-        PrimitiveType primitive) {
-
-      // Create arrow vector for this field
-      int parquetFieldId = primitive.getId().intValue();
-      ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
-      Types.NestedField icebergField = tableIcebergSchema.findField(parquetFieldId);
-      return new VectorReader(desc, icebergField, rootAllocator, recordsPerBatch);
+    @SuppressWarnings("unchecked")
+    public static ColumnarBatchReaders buildReader(
+            Schema tableSchema,
+            Schema expectedSchema,
+            MessageType fileSchema,
+            Integer recordsPerBatch) {
+        LOG.info("=> [VectorizedSparkParquetReaders] recordsPerBatch = {}", recordsPerBatch);
+        return (ColumnarBatchReaders)
+                TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+                        new VectorReaderBuilder(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
     }
 
-    private String[] currentPath() {
-      String[] path = new String[fieldNames.size()];
-      if (!fieldNames.isEmpty()) {
-        Iterator<String> iter = fieldNames.descendingIterator();
-        for (int i = 0; iter.hasNext(); i += 1) {
-          path[i] = iter.next();
+    private static class VectorReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader> {
+        private final MessageType parquetSchema;
+        private final Schema projectedIcebergSchema;
+        private final Schema tableIcebergSchema;
+        private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
+        private final BufferAllocator rootAllocator;
+        private final int recordsPerBatch;
+
+        VectorReaderBuilder(
+                Schema tableSchema,
+                Schema projectedIcebergSchema,
+                MessageType parquetSchema,
+                int recordsPerBatch) {
+            this.parquetSchema = parquetSchema;
+            this.tableIcebergSchema = tableSchema;
+            this.projectedIcebergSchema = projectedIcebergSchema;
+            this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
+            this.recordsPerBatch = recordsPerBatch;
+            this.rootAllocator = ArrowUtils.rootAllocator().newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
+            LOG.info("=> [ReadBuilder] recordsPerBatch = {}", this.recordsPerBatch);
         }
-      }
 
-      return path;
-    }
+        @Override
+        public VectorizedReader message(
+                Types.StructType expected, MessageType message,
+                List<VectorizedReader> fieldReaders) {
+            return struct(expected, message.asGroupType(), fieldReaders);
+        }
 
-    protected MessageType type() {
-      return parquetSchema;
-    }
+        @Override
+        public VectorizedReader struct(
+                Types.StructType expected, GroupType struct,
+                List<VectorizedReader> fieldReaders) {
+
+            // this works on struct fields and the root iceberg schema which itself is a struct.
+
+            // match the expected struct's order
+            Map<Integer, VectorizedReader> readersById = Maps.newHashMap();
+            Map<Integer, Type> typesById = Maps.newHashMap();
+            List<Type> fields = struct.getFields();
+
+            for (int i = 0; i < fields.size(); i += 1) {
+                Type fieldType = fields.get(i);
+                int id = fieldType.getId().intValue();
+                readersById.put(id, fieldReaders.get(i));
+                typesById.put(id, fieldType);
+            }
+
+            List<Types.NestedField> icebergFields = expected != null ?
+                    expected.fields() : ImmutableList.of();
+
+            List<VectorizedReader> reorderedFields = Lists.newArrayListWithExpectedSize(
+                    icebergFields.size());
+
+            List<Type> types = Lists.newArrayListWithExpectedSize(icebergFields.size());
+
+            for (Types.NestedField field : icebergFields) {
+                int id = field.fieldId();
+                VectorizedReader reader = readersById.get(id);
+                if (reader != null) {
+                    reorderedFields.add(reader);
+                    types.add(typesById.get(id));
+                } else {
+                    reorderedFields.add(null); // anjali-todo We need a NullVectorReader
+                    types.add(null);
+                }
+            }
+
+            return new ColumnarBatchReaders(types, expected, reorderedFields);
+        }
+
+        @Override
+        public VectorizedReader primitive(
+                org.apache.iceberg.types.Type.PrimitiveType expected,
+                PrimitiveType primitive) {
 
-    protected String[] path(String name) {
-      String[] path = new String[fieldNames.size() + 1];
-      path[fieldNames.size()] = name;
+            // Create arrow vector for this field
+            int parquetFieldId = primitive.getId().intValue();
+            ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
+            Types.NestedField icebergField = tableIcebergSchema.findField(parquetFieldId);
+            return new VectorizedArrowReader(desc, icebergField, rootAllocator, recordsPerBatch);
+        }
 
-      if (!fieldNames.isEmpty()) {
-        Iterator<String> iter = fieldNames.descendingIterator();
-        for (int i = 0; iter.hasNext(); i += 1) {
-          path[i] = iter.next();
+        private String[] currentPath() {
+            String[] path = new String[fieldNames.size()];
+            if (!fieldNames.isEmpty()) {
+                Iterator<String> iter = fieldNames.descendingIterator();
+                for (int i = 0; iter.hasNext(); i += 1) {
+                    path[i] = iter.next();
+                }
+            }
+            return path;
         }
-      }
 
-      return path;
+        protected MessageType type() {
+            return parquetSchema;
+        }
     }
-  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 5114db2..8ff0bb6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -55,7 +55,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.parquet.VectorReader;
+import org.apache.iceberg.parquet.vectorized.VectorizedArrowReader;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.SparkAvroReader;
@@ -127,7 +127,7 @@ class Reader implements DataSourceReader,
 
     } else {
 
-      this.numRecordsPerBatch = VectorReader.DEFAULT_BATCH_SIZE;
+      this.numRecordsPerBatch = VectorizedArrowReader.DEFAULT_BATCH_SIZE;
     }
     LOG.info("=> Set Config numRecordsPerBatch = {}", numRecordsPerBatch);
 
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
index 70d176e..1c1d376 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
@@ -41,10 +41,10 @@ public abstract class AvroDataTest {
     protected StructType getSupportedPrimitives() {
         return StructType.of(
                 required(100, "id", LongType.get()),
-                required(101, "data", Types.StringType.get()),
-                required(102, "b", Types.BooleanType.get()),
+                optional(101, "data", Types.StringType.get()),
+                optional(102, "b", Types.BooleanType.get()),
                 optional(103, "i", Types.IntegerType.get()),
-                required(104, "l", LongType.get()),
+                optional(104, "l", LongType.get()),
                 optional(105, "f", Types.FloatType.get()),
                 optional(106, "d", Types.DoubleType.get()),
                 optional(107, "date", Types.DateType.get()),
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/DictionaryData.java b/spark/src/test/java/org/apache/iceberg/spark/data/DictionaryData.java
new file mode 100644
index 0000000..74c8cda
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/DictionaryData.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Supplier;
+
+public class DictionaryData {
+
+    public static List<GenericData.Record> generateDictionaryEncodableData(Schema schema, int numRecords, long seed) {
+        List<GenericData.Record> records = Lists.newArrayListWithExpectedSize(numRecords);
+        DictionaryDataGenerator dictionaryDataGenerator = new DictionaryDataGenerator(schema, seed);
+        for (int i = 0; i < numRecords; i += 1) {
+            GenericData.Record rec = (GenericData.Record) TypeUtil.visit(schema, dictionaryDataGenerator);
+            records.add(rec);
+        }
+        return records;
+    }
+
+    private static class DictionaryDataGenerator extends TypeUtil.CustomOrderSchemaVisitor<Object> {
+        private final Map<Type, org.apache.avro.Schema> typeToSchema;
+        private final Random random;
+
+        private DictionaryDataGenerator(Schema schema, long seed) {
+            this.typeToSchema = AvroSchemaUtil.convertTypes(schema.asStruct(), "test");
+            this.random = new Random(seed);
+        }
+
+        @Override
+        public GenericData.Record schema(Schema schema, Supplier<Object> structResult) {
+            return (GenericData.Record) structResult.get();
+        }
+
+        @Override
+        public GenericData.Record struct(Types.StructType struct, Iterable<Object> fieldResults) {
+            GenericData.Record rec = new GenericData.Record(typeToSchema.get(struct));
+
+            List<Object> values = Lists.newArrayList(fieldResults);
+            for (int i = 0; i < values.size(); i += 1) {
+                rec.put(i, values.get(i));
+            }
+
+            return rec;
+        }
+
+        @Override
+        public Object field(Types.NestedField field, Supplier<Object> fieldResult) {
+            // return null 5% of the time when the value is optional
+            if (field.isOptional() && random.nextInt(20) == 1) {
+                return null;
+            }
+            return fieldResult.get();
+        }
+
+        @Override
+        public Object list(Types.ListType list, Supplier<Object> elementResult) {
+            int numElements = random.nextInt(20);
+
+            List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
+            for (int i = 0; i < numElements; i += 1) {
+                // return null 5% of the time when the value is optional
+                if (list.isElementOptional() && random.nextInt(20) == 1) {
+                    result.add(null);
+                } else {
+                    result.add(elementResult.get());
+                }
+            }
+
+            return result;
+        }
+
+        @Override
+        public Object map(Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
+            int numEntries = random.nextInt(20);
+
+            Map<Object, Object> result = Maps.newLinkedHashMap();
+            Set<Object> keySet = Sets.newHashSet();
+            for (int i = 0; i < numEntries; i += 1) {
+                Object key = keyResult.get();
+                // ensure no collisions
+                while (keySet.contains(key)) {
+                    key = keyResult.get();
+                }
+
+                keySet.add(key);
+
+                // return null 5% of the time when the value is optional
+                if (map.isValueOptional() && random.nextInt(20) == 1) {
+                    result.put(key, null);
+                } else {
+                    result.put(key, valueResult.get());
+                }
+            }
+
+            return result;
+        }
+
+        @Override
+        public Object primitive(Type.PrimitiveType primitive) {
+            Object result = generatePrimitive(primitive, random);
+            // For the primitives that Avro needs a different type than Spark, fix
+            // them here.
+            switch (primitive.typeId()) {
+                case STRING:
+                    return ((UTF8String) result).toString();
+                case FIXED:
+                    return new GenericData.Fixed(typeToSchema.get(primitive),
+                            (byte[]) result);
+                case BINARY:
+                    return ByteBuffer.wrap((byte[]) result);
+                case UUID:
+                    return UUID.nameUUIDFromBytes((byte[]) result);
+                case DECIMAL:
+                    return ((Decimal) result).toJavaBigDecimal();
+                default:
+                    return result;
+            }
+        }
+    }
+
+    private static Object generatePrimitive(Type.PrimitiveType primitive,
+                                            Random random) {
+        // 3 choices
+        int choice = random.nextInt(3);
+        switch (primitive.typeId()) {
+            case BOOLEAN:
+                return true; // doesn't really matter for booleans since they are not dictionary encoded
+
+            case INTEGER:
+                switch (choice) {
+                    case 0:
+                        return 0;
+                    case 1:
+                        return 1;
+                    case 2:
+                        return 2;
+                }
+
+            case LONG:
+                switch (choice) {
+                    case 0:
+                        return 0L;
+                    case 1:
+                        return 1L;
+                    case 2:
+                        return 2L;
+                }
+
+            case FLOAT:
+                switch (choice) {
+                    case 0:
+                        return 0.0f;
+                    case 1:
+                        return 1.0f;
+                    case 2:
+                        return 2.0f;
+                }
+
+            case DOUBLE:
+                switch (choice) {
+                    case 0:
+                        return 0.0d;
+                    case 1:
+                        return 1.0d;
+                    case 2:
+                        return 2.0d;
+                }
+
+            case DATE:
+                switch (choice) {
+                    case 0:
+                        return 0;
+                    case 1:
+                        return 1;
+                    case 2:
+                        return 2;
+                }
+
+            case TIME:
+                switch (choice) {
+                    case 0:
+                        return 0l;
+                    case 1:
+                        return 1l;
+                    case 2:
+                        return 2l;
+                }
+
+            case TIMESTAMP:
+                switch (choice) {
+                    case 0:
+                        return 0l;
+                    case 1:
+                        return 1l;
+                    case 2:
+                        return 2l;
+                }
+
+            case STRING:
+                switch (choice) {
+                    case 0:
+                        return UTF8String.fromString("0");
+                    case 1:
+                        return UTF8String.fromString("1");
+                    case 2:
+                        return UTF8String.fromString("2");
+                }
+
+            case FIXED:
+                byte[] fixed = new byte[((Types.FixedType) primitive).length()];
+                switch (choice) {
+                    case 0:
+                        fixed[0] = 0;
+                        return fixed;
+                    case 1:
+                        fixed[0] = 1;
+                        return fixed;
+                    case 2:
+                        fixed[0] = 2;
+                        return fixed;
+                }
+
+            case BINARY:
+                byte[] binary = new byte[4];
+                switch (choice) {
+                    case 0:
+                        binary[0] = 0;
+                        return binary;
+                    case 1:
+                        binary[0] = 1;
+                        return binary;
+                    case 2:
+                        binary[0] = 2;
+                        return binary;
+                }
+
+            case DECIMAL:
+                Types.DecimalType type = (Types.DecimalType) primitive;
+                switch (choice) {
+                    case 0:
+                        BigInteger unscaled = new BigInteger("1");
+                        return Decimal.apply(new BigDecimal(unscaled, type.scale()));
+                    case 1:
+                        unscaled = new BigInteger("2");
+                        return Decimal.apply(new BigDecimal(unscaled, type.scale()));
+                    case 2:
+                        unscaled = new BigInteger("3");
+                        return Decimal.apply(new BigDecimal(unscaled, type.scale()));
+                }
+
+
+            default:
+                throw new IllegalArgumentException(
+                        "Cannot generate random value for unknown type: " + primitive);
+        }
+    }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 928ec8a..35bb5c7 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -33,9 +33,12 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+
+import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.storage.serde2.io.DateWritable;
@@ -53,6 +56,7 @@ import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Assert;
@@ -218,11 +222,6 @@ public class TestHelpers {
 
         Type fieldType = fields.get(i).type();
         Object expectedValue = expRec.get(i);
-        // System.out.println("   -> Checking Row "+r+", field #"+i
-        //     + " , Field:"+ fields.get(i).name()
-        //     + " , optional:"+fields.get(i).isOptional()
-        //     + " , type:"+fieldType.typeId()
-        //     + " , expected:"+expectedValue);
         if (actualRow.isNullAt(i)) {
           Assert.assertTrue("Expect null at " + r, expectedValue == null);
         } else {
@@ -233,6 +232,30 @@ public class TestHelpers {
     }
   }
 
+  public static void assertArrowVectors(Types.StructType struct, List<Record> expected, ColumnarBatch batch) {
+    List<Types.NestedField> fields = struct.fields();
+    for (int r = 0; r < batch.numRows(); r++) {
+      Record expRec = expected.get(r);
+      InternalRow actualRow = batch.getRow(r);
+      for (int i = 0; i < fields.size(); i += 1) {
+        ColumnVector vector = batch.column(i);
+        Assert.assertTrue(vector instanceof IcebergArrowColumnVector);
+        IcebergArrowColumnVector.ArrowVectorAccessor accessor = ((IcebergArrowColumnVector) vector).getAccessor();
+        ValueVector arrowVector = accessor.getUnderlyingArrowVector();
+        Type fieldType = fields.get(i).type();
+        Object expectedValue = expRec.get(i);
+        if (actualRow.isNullAt(i)) {
+          Assert.assertTrue("Expect null at " + r, expectedValue == null);
+          Assert.assertTrue("Expected the value to be set as null in the arrow vector", arrowVector.isNull(r));
+        } else {
+          Object actualValue = actualRow.get(i, convert(fieldType));
+          Assert.assertFalse("Expected the value to be set as non-null in the arrow vector", arrowVector.isNull(r));
+          assertEqualsUnsafe(fieldType, expectedValue, actualValue);
+        }
+      }
+    }
+  }
+
   private static void assertEqualsUnsafe(Types.ListType list, Collection<?> expected, ArrayData actual) {
     Type elementType = list.elementType();
     List<?> expectedElements = Lists.newArrayList(expected);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetDictionaryEncodedVectorizedReader.java
similarity index 67%
copy from spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
copy to spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetDictionaryEncodedVectorizedReader.java
index 410d5e3..e494b98 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetDictionaryEncodedVectorizedReader.java
@@ -14,25 +14,15 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-public class TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader extends TestSparkParquetVectorizedReader {
-
-    @Override
-    protected Types.StructType getSupportedPrimitives() {
-        return Types.StructType.of(
-                required(100, "id", Types.LongType.get()),
-                required(101, "data", Types.StringType.get()));
-    }
+public class TestSparkParquetDictionaryEncodedVectorizedReader extends TestSparkParquetVectorizedReader {
 
     @Override
     protected void writeAndValidate(Schema schema) throws IOException {
-        setupArrowFlags();
         // Write test data
         Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
                 type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
 
-        List<GenericData.Record> expected = RandomData.generateListWithFallBackDictionaryEncodingForStrings(schema, 1000000, 0L, 0.5f);
+        List<GenericData.Record> expected = DictionaryData.generateDictionaryEncodableData(schema, 100000, 0L);
 
         // write a test parquet file using iceberg writer
         File testFile = temp.newFile();
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
index 410d5e3..fdb6f4d 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
@@ -27,7 +27,6 @@ public class TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader ext
 
     @Override
     protected void writeAndValidate(Schema schema) throws IOException {
-        setupArrowFlags();
         // Write test data
         Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
                 type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
index a1697e4..b14a55f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
@@ -36,16 +36,23 @@ import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
+import static org.apache.iceberg.spark.data.TestHelpers.assertArrowVectors;
 import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
 
 public class TestSparkParquetVectorizedReader extends AvroDataTest {
 
+  @Before
+  public void setupArrowFlags() {
+    System.setProperty("arrow.enable_unsafe_memory_access", "true");
+    System.setProperty("arrow.enable_null_check_for_get", "false");
+  }
+
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
-    setupArrowFlags();
     // Write test data
     Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
         type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
@@ -83,18 +90,10 @@ public class TestSparkParquetVectorizedReader extends AvroDataTest {
         for (int i = numExpectedRead; i < numExpectedRead + batch.numRows(); i++) {
           expectedBatch.add(expected.get(i));
         }
-
-        // System.out.println("-> Check "+numExpectedRead+" - "+ (numExpectedRead+batch.numRows()));
-        assertEqualsUnsafe(schema.asStruct(), expectedBatch, batch);
-
-        System.out.println("Batch read with " + batch.numRows() + " rows. Read " + numRowsRead + " till now. " +
-            "Expected batch " + expectedBatch.size());
-
+        assertArrowVectors(schema.asStruct(), expectedBatch, batch);
         numExpectedRead += batch.numRows();
       }
-
       Assert.assertEquals(expected.size(), numRowsRead);
-
     }
   }
 
@@ -133,8 +132,4 @@ public class TestSparkParquetVectorizedReader extends AvroDataTest {
     System.out.println("Not Supported");
   }
 
-  void setupArrowFlags() {
-    System.setProperty("arrow.enable_unsafe_memory_access", "true");
-    System.setProperty("arrow.enable_null_check_for_get", "false");
-  }
 }