You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ry...@apache.org on 2021/04/29 17:36:07 UTC

[iceberg] branch master updated: Add Arrow vectorized reader (#2286)

This is an automated email from the ASF dual-hosted git repository.

rymurr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 657de68  Add Arrow vectorized reader (#2286)
657de68 is described below

commit 657de686a1e74e9b5b77fb81e23192cfeecab057
Author: mayursrivastava <ma...@twosigma.com>
AuthorDate: Thu Apr 29 13:35:50 2021 -0400

    Add Arrow vectorized reader (#2286)
    
    Co-authored-by: Mayur Srivastava <ma...@gmail.com>
---
 .../iceberg/arrow/vectorized/ArrowBatchReader.java | 104 +++
 .../iceberg/arrow/vectorized/ArrowReader.java      | 325 +++++++
 .../arrow}/vectorized/ArrowVectorAccessor.java     |  69 +-
 .../arrow/vectorized/ArrowVectorAccessors.java     |  70 ++
 .../iceberg/arrow/vectorized/ColumnVector.java     | 116 +++
 .../iceberg/arrow/vectorized/ColumnarBatch.java    |  88 ++
 .../GenericArrowVectorAccessorFactory.java         | 695 +++++++++++++++
 .../arrow/vectorized/VectorizedArrowReader.java    |   7 +-
 .../arrow/vectorized/VectorizedReaderBuilder.java  | 130 +++
 .../vectorized/VectorizedTableScanIterable.java    |  73 ++
 .../iceberg/arrow/vectorized/ArrowReaderTest.java  | 935 +++++++++++++++++++++
 build.gradle                                       |  10 +-
 .../vectorized/ArrowVectorAccessorFactory.java     | 110 +++
 .../data/vectorized/ArrowVectorAccessors.java      | 496 +----------
 .../data/vectorized/IcebergArrowColumnVector.java  |   5 +-
 .../vectorized/VectorizedSparkParquetReaders.java  | 107 +--
 16 files changed, 2711 insertions(+), 629 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java
new file mode 100644
index 0000000..9eaed19
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * A collection of vectorized readers per column (in the expected read schema) and Arrow Vector holders. This class owns
+ * the Arrow vectors and is responsible for closing the Arrow vectors.
+ */
+class ArrowBatchReader implements VectorizedReader<ColumnarBatch> {
+
+  private final VectorizedArrowReader[] readers;
+  private final VectorHolder[] vectorHolders;
+
+  ArrowBatchReader(List<VectorizedReader<?>> readers) {
+    this.readers = readers.stream()
+        .map(VectorizedArrowReader.class::cast)
+        .toArray(VectorizedArrowReader[]::new);
+    this.vectorHolders = new VectorHolder[readers.size()];
+  }
+
+  @Override
+  public final void setRowGroupInfo(
+      PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
+    for (VectorizedArrowReader reader : readers) {
+      reader.setRowGroupInfo(pageStore, metaData, rowPosition);
+    }
+  }
+
+  @Override
+  public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
+    Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
+
+    if (reuse == null) {
+      closeVectors();
+    }
+
+    ColumnVector[] columnVectors = new ColumnVector[readers.length];
+    for (int i = 0; i < readers.length; i += 1) {
+      vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
+      int numRowsInVector = vectorHolders[i].numValues();
+      Preconditions.checkState(
+          numRowsInVector == numRowsToRead,
+          "Number of rows in the vector %s didn't match expected %s ", numRowsInVector,
+          numRowsToRead);
+      // Handle null vector for constant case
+      columnVectors[i] = new ColumnVector(vectorHolders[i]);
+    }
+    return new ColumnarBatch(numRowsToRead, columnVectors);
+  }
+
+  private void closeVectors() {
+    for (int i = 0; i < vectorHolders.length; i++) {
+      if (vectorHolders[i] != null) {
+        // Release any resources used by the vector
+        if (vectorHolders[i].vector() != null) {
+          vectorHolders[i].vector().close();
+        }
+        vectorHolders[i] = null;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    for (VectorizedReader<?> reader : readers) {
+      reader.close();
+    }
+    closeVectors();
+  }
+
+  @Override
+  public void setBatchSize(int batchSize) {
+    for (VectorizedArrowReader reader : readers) {
+      if (reader != null) {
+        reader.setBatchSize(batchSize);
+      }
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
new file mode 100644
index 0000000..60b79a6
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ *     <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link MinorType#BIT}</li>
+ *     <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link MinorType#INT}</li>
+ *     <li>Iceberg: {@link Types.LongType}, Arrow: {@link MinorType#BIGINT}</li>
+ *     <li>Iceberg: {@link Types.FloatType}, Arrow: {@link MinorType#FLOAT4}</li>
+ *     <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link MinorType#FLOAT8}</li>
+ *     <li>Iceberg: {@link Types.StringType}, Arrow: {@link MinorType#VARCHAR}</li>
+ *     <li>Iceberg: {@link Types.TimestampType} (both with and without timezone),
+ *         Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link MinorType#VARBINARY}</li>
+ *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ *     <li>Type promotion: In case of type promotion, the Arrow vector corresponding to
+ *     the data type in the parquet file is returned instead of the data type in the latest schema.
+ *     See https://github.com/apache/iceberg/issues/2483.</li>
+ *     <li>Columns with constant values are physically encoded as a dictionary. The Arrow vector
+ *     type is int32 instead of the type as per the schema.
+ *     See https://github.com/apache/iceberg/issues/2484.</li>
+ *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link Types.MapType},
+ *     {@link Types.StructType}, {@link Types.UUIDType}, {@link Types.FixedType} and
+ *     {@link Types.DecimalType}
+ *     See https://github.com/apache/iceberg/issues/2485 and https://github.com/apache/iceberg/issues/2486.</li>
+ *     <li>Iceberg v2 spec is not supported.
+ *     See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+  private final Schema schema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final int batchSize;
+  private final boolean reuseContainers;
+
+  /**
+   * Create a new instance of the reader.
+   *
+   * @param scan the table scan object.
+   * @param batchSize the maximum number of rows per Arrow batch.
+   * @param reuseContainers whether to reuse Arrow vectors when iterating through the data.
+   *                        If set to {@code false}, every {@link Iterator#next()} call creates
+   *                        new instances of Arrow vectors.
+   *                        If set to {@code true}, the Arrow vectors in the previous
+   *                        {@link Iterator#next()} may be reused for the data returned
+   *                        in the current {@link Iterator#next()}.
+   *                        This option avoids allocating memory again and again.
+   *                        Irrespective of the value of {@code reuseContainers}, the Arrow vectors
+   *                        in the previous {@link Iterator#next()} call are closed before creating
+   *                        new instances if the current {@link Iterator#next()}.
+   */
+  public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+    this.schema = scan.schema();
+    this.io = scan.table().io();
+    this.encryption = scan.table().encryption();
+    this.batchSize = batchSize;
+    // start planning tasks in the background
+    this.reuseContainers = reuseContainers;
+  }
+
+  /**
+   * Returns a new iterator of {@link ColumnarBatch} objects.
+   * <p>
+   * Note that the reader owns the {@link ColumnarBatch} objects and takes care of closing them.
+   * The caller should not hold onto a {@link ColumnarBatch} or try to close them.
+   *
+   * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} are closed before returning the next {@link ColumnarBatch} object.
+   * This implies that the caller should either use the {@link ColumnarBatch} or transfer the ownership of
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   *
+   * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} may be reused for the next {@link ColumnarBatch}.
+   * This implies that the caller should either use the {@link ColumnarBatch} or deep copy the
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   */
+  public CloseableIterator<ColumnarBatch> open(CloseableIterable<CombinedScanTask> tasks) {
+    CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+        tasks,
+        schema,
+        null,
+        io,
+        encryption,
+        true,
+        batchSize,
+        reuseContainers
+    );
+    addCloseable(itr);
+    return itr;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close(); // close data files
+  }
+
+  /**
+   * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+   * Only Parquet data file format is supported.
+   */
+  private static final class VectorizedCombinedScanIterator implements CloseableIterator<ColumnarBatch> {
+
+    private final List<FileScanTask> fileTasks;
+    private final Iterator<FileScanTask> fileItr;
+    private final Map<String, InputFile> inputFiles;
+    private final Schema expectedSchema;
+    private final String nameMapping;
+    private final boolean caseSensitive;
+    private final int batchSize;
+    private final boolean reuseContainers;
+    private CloseableIterator<ColumnarBatch> currentIterator;
+    private ColumnarBatch current;
+    private FileScanTask currentTask;
+
+    /**
+     * Create a new instance.
+     *
+     * @param tasks             Combined file scan tasks.
+     * @param expectedSchema    Read schema. The returned data will have this schema.
+     * @param nameMapping       Mapping from external schema names to Iceberg type IDs.
+     * @param io                File I/O.
+     * @param encryptionManager Encryption manager.
+     * @param caseSensitive     If {@code true}, column names are case sensitive.
+     *                          If {@code false}, column names are not case sensitive.
+     * @param batchSize         Batch size in number of rows. Each Arrow batch contains
+     *                          a maximum of {@code batchSize} rows.
+     * @param reuseContainers   If set to {@code false}, every {@link Iterator#next()} call creates
+     *                          new instances of Arrow vectors.
+     *                          If set to {@code true}, the Arrow vectors in the previous
+     *                          {@link Iterator#next()} may be reused for the data returned
+     *                          in the current {@link Iterator#next()}.
+     *                          This option avoids allocating memory again and again.
+     *                          Irrespective of the value of {@code reuseContainers}, the Arrow vectors
+     *                          in the previous {@link Iterator#next()} call are closed before creating
+     *                          new instances if the current {@link Iterator#next()}.
+     */
+    VectorizedCombinedScanIterator(
+        CloseableIterable<CombinedScanTask> tasks,
+        Schema expectedSchema,
+        String nameMapping,
+        FileIO io,
+        EncryptionManager encryptionManager,
+        boolean caseSensitive,
+        int batchSize,
+        boolean reuseContainers) {
+      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
+          .map(CombinedScanTask::files)
+          .flatMap(Collection::stream)
+          .collect(Collectors.toList());
+      this.fileItr = fileTasks.iterator();
+      this.inputFiles = Collections.unmodifiableMap(fileTasks.stream()
+          .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+          .map(file -> EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), file.keyMetadata()))
+          .map(encryptionManager::decrypt)
+          .collect(Collectors.toMap(InputFile::location, Function.identity())));
+      this.currentIterator = CloseableIterator.empty();
+      this.expectedSchema = expectedSchema;
+      this.nameMapping = nameMapping;
+      this.caseSensitive = caseSensitive;
+      this.batchSize = batchSize;
+      this.reuseContainers = reuseContainers;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        while (true) {
+          if (currentIterator.hasNext()) {
+            this.current = currentIterator.next();
+            return true;
+          } else if (fileItr.hasNext()) {
+            this.currentIterator.close();
+            this.currentTask = fileItr.next();
+            this.currentIterator = open(currentTask);
+          } else {
+            this.currentIterator.close();
+            return false;
+          }
+        }
+      } catch (IOException | RuntimeException e) {
+        if (currentTask != null && !currentTask.isDataTask()) {
+          throw new RuntimeException(
+              "Error reading file: " + getInputFile(currentTask).location() +
+                  ". Reason: the current task is not a data task, i.e. it cannot read data rows. " +
+                  "Ensure that the tasks passed to the constructor are data tasks. " +
+                  "The file scan tasks are: " + fileTasks,
+              e);
+        } else {
+          throw new RuntimeException(
+              "An error occurred while iterating through the file scan tasks or closing the iterator," +
+                  " see the stacktrace for further information. The file scan tasks are: " + fileTasks, e);
+        }
+      }
+    }
+
+    @Override
+    public ColumnarBatch next() {
+      return current;
+    }
+
+    CloseableIterator<ColumnarBatch> open(FileScanTask task) {
+      CloseableIterable<ColumnarBatch> iter;
+      InputFile location = getInputFile(task);
+      Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
+      if (task.file().format() == FileFormat.PARQUET) {
+        Parquet.ReadBuilder builder = Parquet.read(location)
+            .project(expectedSchema)
+            .split(task.start(), task.length())
+            .createBatchedReaderFunc(fileSchema -> buildReader(expectedSchema,
+                fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED))
+            .recordsPerBatch(batchSize)
+            .filter(task.residual())
+            .caseSensitive(caseSensitive);
+
+        if (reuseContainers) {
+          builder.reuseContainers();
+        }
+        if (nameMapping != null) {
+          builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+        }
+
+        iter = builder.build();
+      } else {
+        throw new UnsupportedOperationException(
+            "Format: " + task.file().format() + " not supported for batched reads");
+      }
+      return iter.iterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+      // close the current iterator
+      this.currentIterator.close();
+
+      // exhaust the task iterator
+      while (fileItr.hasNext()) {
+        fileItr.next();
+      }
+    }
+
+    private InputFile getInputFile(FileScanTask task) {
+      Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
+      return inputFiles.get(task.file().path().toString());
+    }
+
+    /**
+     * Build the {@link ArrowBatchReader} for the expected schema and file schema.
+     *
+     * @param expectedSchema         Expected schema of the data returned.
+     * @param fileSchema             Schema of the data file.
+     * @param setArrowValidityVector Indicates whether to set the validity vector in Arrow vectors.
+     */
+    private static ArrowBatchReader buildReader(
+        Schema expectedSchema,
+        MessageType fileSchema,
+        boolean setArrowValidityVector) {
+      return (ArrowBatchReader)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new VectorizedReaderBuilder(
+                  expectedSchema, fileSchema, setArrowValidityVector, ImmutableMap.of(), ArrowBatchReader::new));
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessor.java
similarity index 56%
rename from spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java
rename to arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessor.java
index c9c9959..c912531 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessor.java
@@ -17,79 +17,84 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.data.vectorized;
+package org.apache.iceberg.arrow.vectorized;
 
 import org.apache.arrow.vector.ValueVector;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.vectorized.ArrowColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarArray;
-import org.apache.spark.unsafe.types.UTF8String;
-
-@SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class ArrowVectorAccessor {
 
+public class ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+    implements AutoCloseable {
   private final ValueVector vector;
-  private final ArrowColumnVector[] childColumns;
+  private final ChildVectorT[] childColumns;
 
-  ArrowVectorAccessor(ValueVector vector) {
-    this.vector = vector;
-    this.childColumns = new ArrowColumnVector[0];
+  protected ArrowVectorAccessor(ValueVector vector) {
+    this(vector, null);
   }
 
-  ArrowVectorAccessor(ValueVector vector, ArrowColumnVector[] children) {
+  protected ArrowVectorAccessor(ValueVector vector, ChildVectorT[] children) {
     this.vector = vector;
     this.childColumns = children;
   }
 
-  final void close() {
-    for (ArrowColumnVector column : childColumns) {
-      // Closing an ArrowColumnVector is expected to not throw any exception
-      column.close();
+  @Override
+  public void close() {
+    if (childColumns != null) {
+      for (ChildVectorT column : childColumns) {
+        try {
+          // Closing an ArrowColumnVector is expected to not throw any exception
+          column.close();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
     }
     vector.close();
   }
 
-  boolean getBoolean(int rowId) {
+  public boolean getBoolean(int rowId) {
     throw new UnsupportedOperationException("Unsupported type: boolean");
   }
 
-  int getInt(int rowId) {
+  public int getInt(int rowId) {
     throw new UnsupportedOperationException("Unsupported type: int");
   }
 
-  long getLong(int rowId) {
+  public long getLong(int rowId) {
     throw new UnsupportedOperationException("Unsupported type: long");
   }
 
-  float getFloat(int rowId) {
+  public float getFloat(int rowId) {
     throw new UnsupportedOperationException("Unsupported type: float");
   }
 
-  double getDouble(int rowId) {
+  public double getDouble(int rowId) {
     throw new UnsupportedOperationException("Unsupported type: double");
   }
 
-  Decimal getDecimal(int rowId, int precision, int scale) {
-    throw new UnsupportedOperationException("Unsupported type: decimal");
+  public byte[] getBinary(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: binary");
   }
 
-  UTF8String getUTF8String(int rowId) {
-    throw new UnsupportedOperationException("Unsupported type: UTF8String");
+  public DecimalT getDecimal(int rowId, int precision, int scale) {
+    throw new UnsupportedOperationException("Unsupported type: decimal");
   }
 
-  byte[] getBinary(int rowId) {
-    throw new UnsupportedOperationException("Unsupported type: binary");
+  public Utf8StringT getUTF8String(int rowId) {
+    throw new UnsupportedOperationException("Unsupported type: UTF8String");
   }
 
-  ColumnarArray getArray(int rowId) {
+  public ArrayT getArray(int rowId) {
     throw new UnsupportedOperationException("Unsupported type: array");
   }
 
-  ArrowColumnVector childColumn(int pos) {
-    return childColumns[pos];
+  public ChildVectorT childColumn(int pos) {
+    if (childColumns != null) {
+      return childColumns[pos];
+    } else {
+      throw new IndexOutOfBoundsException("Child columns is null hence cannot find index: " + pos);
+    }
   }
 
-  public ValueVector getVector() {
+  public final ValueVector getVector() {
     return vector;
   }
 }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
new file mode 100644
index 0000000..ccebf33
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.nio.charset.StandardCharsets;
+import java.util.function.Supplier;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory.StringFactory;
+
+final class ArrowVectorAccessors {
+
+  private static final GenericArrowVectorAccessorFactory<?, String, ?, ?> factory;
+
+  static {
+    factory = new GenericArrowVectorAccessorFactory<>(
+        throwingSupplier("Decimal type is not supported"),
+        JavaStringFactory::new,
+        throwingSupplier("Struct type is not supported"),
+        throwingSupplier("List type is not supported")
+    );
+  }
+
+  private static <T> Supplier<T> throwingSupplier(String message) {
+    return () -> {
+      throw new UnsupportedOperationException(message);
+    };
+  }
+
+  private ArrowVectorAccessors() {
+    throw new UnsupportedOperationException(ArrowVectorAccessors.class.getName() + " cannot be instantiated.");
+  }
+
+  static ArrowVectorAccessor<?, String, ?, ?> getVectorAccessor(VectorHolder holder) {
+    return factory.getVectorAccessor(holder);
+  }
+
+  private static final class JavaStringFactory implements StringFactory<String> {
+    @Override
+    public Class<String> getGenericClass() {
+      return String.class;
+    }
+
+    @Override
+    public String ofRow(VarCharVector vector, int rowId) {
+      return ofBytes(vector.get(rowId));
+    }
+
+    @Override
+    public String ofBytes(byte[] bytes) {
+      return new String(bytes, StandardCharsets.UTF_8);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
new file mode 100644
index 0000000..e395926
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.types.Types;
+
+/**
+ * This class is inspired by Spark's {@code ColumnVector}.
+ * This class represents the column data for an Iceberg table query.
+ * It wraps an arrow {@link FieldVector} and provides simple
+ * accessors for the row values. Advanced users can access
+ * the {@link FieldVector}.
+ * <p>
+ *   Supported Iceberg data types:
+ *   <ul>
+ *     <li>{@link Types.BooleanType}</li>
+ *     <li>{@link Types.IntegerType}</li>
+ *     <li>{@link Types.LongType}</li>
+ *     <li>{@link Types.FloatType}</li>
+ *     <li>{@link Types.DoubleType}</li>
+ *     <li>{@link Types.StringType}</li>
+ *     <li>{@link Types.BinaryType}</li>
+ *     <li>{@link Types.TimestampType} (with and without timezone)</li>
+ *     <li>{@link Types.DateType}</li>
+ *   </ul>
+ */
+public class ColumnVector implements AutoCloseable {
+  private final VectorHolder vectorHolder;
+  private final ArrowVectorAccessor<?, String, ?, ?> accessor;
+  private final NullabilityHolder nullabilityHolder;
+
+  ColumnVector(VectorHolder vectorHolder) {
+    this.vectorHolder = vectorHolder;
+    this.nullabilityHolder = vectorHolder.nullabilityHolder();
+    this.accessor = getVectorAccessor(vectorHolder);
+  }
+
+  public FieldVector getFieldVector() {
+    // TODO Convert dictionary encoded vectors to correctly typed arrow vector.
+    //   e.g. convert long dictionary encoded vector to a BigIntVector.
+    return vectorHolder.vector();
+  }
+
+  public boolean hasNull() {
+    return nullabilityHolder.hasNulls();
+  }
+
+  public int numNulls() {
+    return nullabilityHolder.numNulls();
+  }
+
+  @Override
+  public void close() {
+    accessor.close();
+  }
+
+  public boolean isNullAt(int rowId) {
+    return nullabilityHolder.isNullAt(rowId) == 1;
+  }
+
+  public boolean getBoolean(int rowId) {
+    return accessor.getBoolean(rowId);
+  }
+
+  public int getInt(int rowId) {
+    return accessor.getInt(rowId);
+  }
+
+  public long getLong(int rowId) {
+    return accessor.getLong(rowId);
+  }
+
+  public float getFloat(int rowId) {
+    return accessor.getFloat(rowId);
+  }
+
+  public double getDouble(int rowId) {
+    return accessor.getDouble(rowId);
+  }
+
+  public String getString(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getUTF8String(rowId);
+  }
+
+  public byte[] getBinary(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor.getBinary(rowId);
+  }
+
+  private static ArrowVectorAccessor<?, String, ?, ?> getVectorAccessor(VectorHolder holder) {
+    return ArrowVectorAccessors.getVectorAccessor(holder);
+  }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnarBatch.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnarBatch.java
new file mode 100644
index 0000000..976447e
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnarBatch.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.Arrays;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This class is inspired by Spark's {@code ColumnarBatch}.
+ * This class wraps a columnar batch in the result set of an Iceberg table query.
+ */
+public class ColumnarBatch implements AutoCloseable {
+
+  private final int numRows;
+  private final ColumnVector[] columns;
+
+  ColumnarBatch(int numRows, ColumnVector[] columns) {
+    for (int i = 0; i < columns.length; i++) {
+      int columnValueCount = columns[i].getFieldVector().getValueCount();
+      Preconditions.checkArgument(numRows == columnValueCount,
+          "Number of rows (=" + numRows + ") != column[" + i + "] size (=" + columnValueCount + ")");
+    }
+    this.numRows = numRows;
+    this.columns = columns;
+  }
+
+  /**
+   * Create a new instance of {@link VectorSchemaRoot}
+   * from the arrow vectors stored in this arrow batch.
+   * The arrow vectors are owned by the reader.
+   */
+  public VectorSchemaRoot createVectorSchemaRootFromVectors() {
+    return VectorSchemaRoot.of(Arrays.stream(columns)
+        .map(ColumnVector::getFieldVector)
+        .toArray(FieldVector[]::new));
+  }
+
+  /**
+   * Called to close all the columns in this batch. It is not valid to access the data after calling this. This must be
+   * called at the end to clean up memory allocations.
+   */
+  @Override
+  public void close() {
+    for (ColumnVector c : columns) {
+      c.close();
+    }
+  }
+
+  /**
+   * Returns the number of columns that make up this batch.
+   */
+  public int numCols() {
+    return columns.length;
+  }
+
+  /**
+   * Returns the number of rows for read, including filtered rows.
+   */
+  public int numRows() {
+    return numRows;
+  }
+
+  /**
+   * Returns the column at `ordinal`.
+   */
+  public ColumnVector column(int ordinal) {
+    return columns[ordinal];
+  }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
new file mode 100644
index 0000000..df4e216
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * This class is creates typed {@link ArrowVectorAccessor} from {@link VectorHolder}.
+ * It provides a generic implementation for following Arrow types:
+ * <ul>
+ *   <li>Decimal type can be deserialized to a type that supports decimal,
+ *   e.g. BigDecimal or Spark's Decimal.</li>
+ *   <li>UTF8 String type can deserialized to a Java String or Spark's UTF8String.</li>
+ *   <li>List type: the child elements of a list can be deserialized to Spark's ColumnarArray or similar type.</li>
+ *   <li>Struct type: the child elements of a struct can be deserialized to a Spark's ArrowColumnVector
+ *   or similar type.</li>
+ * </ul>
+ * @param <DecimalT> A concrete type that can represent a decimal.
+ * @param <Utf8StringT> A concrete type that can represent a UTF8 string.
+ * @param <ArrayT> A concrete type that can represent an array value in a list vector, e.g. Spark's ColumnarArray.
+ * @param <ChildVectorT> A concrete type that can represent a child vector in a struct, e.g. Spark's ArrowColumnVector.
+ */
+public class GenericArrowVectorAccessorFactory<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable> {
+
+  private final Supplier<DecimalFactory<DecimalT>> decimalFactorySupplier;
+  private final Supplier<StringFactory<Utf8StringT>> stringFactorySupplier;
+  private final Supplier<StructChildFactory<ChildVectorT>> structChildFactorySupplier;
+  private final Supplier<ArrayFactory<ChildVectorT, ArrayT>> arrayFactorySupplier;
+
+  /**
+   * The constructor is parameterized using the decimal, string, struct and array factories.
+   * If a specific type is not supported, the factory supplier can raise an
+   * {@link UnsupportedOperationException}.
+   */
+  protected GenericArrowVectorAccessorFactory(
+          Supplier<DecimalFactory<DecimalT>> decimalFactorySupplier,
+          Supplier<StringFactory<Utf8StringT>> stringFactorySupplier,
+          Supplier<StructChildFactory<ChildVectorT>> structChildFactorySupplier,
+          Supplier<ArrayFactory<ChildVectorT, ArrayT>> arrayFactorySupplier) {
+    this.decimalFactorySupplier = decimalFactorySupplier;
+    this.stringFactorySupplier = stringFactorySupplier;
+    this.structChildFactorySupplier = structChildFactorySupplier;
+    this.arrayFactorySupplier = arrayFactorySupplier;
+  }
+
+  public ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getVectorAccessor(VectorHolder holder) {
+    Dictionary dictionary = holder.dictionary();
+    boolean isVectorDictEncoded = holder.isDictionaryEncoded();
+    FieldVector vector = holder.vector();
+    if (isVectorDictEncoded) {
+      ColumnDescriptor desc = holder.descriptor();
+      PrimitiveType primitive = desc.getPrimitiveType();
+      return getDictionaryVectorAccessor(dictionary, desc, vector, primitive);
+    } else {
+      return getPlainVectorAccessor(vector);
+    }
+  }
+
+  private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getDictionaryVectorAccessor(
+      Dictionary dictionary,
+      ColumnDescriptor desc,
+      FieldVector vector, PrimitiveType primitive) {
+    Preconditions.checkState(vector instanceof IntVector, "Dictionary ids should be stored in IntVectors only");
+    if (primitive.getOriginalType() != null) {
+      switch (desc.getPrimitiveType().getOriginalType()) {
+        case ENUM:
+        case JSON:
+        case UTF8:
+        case BSON:
+          return new DictionaryStringAccessor<>((IntVector) vector, dictionary, stringFactorySupplier.get());
+        case INT_64:
+        case TIMESTAMP_MILLIS:
+        case TIMESTAMP_MICROS:
+          return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+        case DECIMAL:
+          switch (primitive.getPrimitiveTypeName()) {
+            case BINARY:
+            case FIXED_LEN_BYTE_ARRAY:
+              return new DictionaryDecimalBinaryAccessor<>(
+                  (IntVector) vector,
+                  dictionary,
+                  decimalFactorySupplier.get());
+            case INT64:
+              return new DictionaryDecimalLongAccessor<>(
+                  (IntVector) vector,
+                  dictionary,
+                  decimalFactorySupplier.get());
+            case INT32:
+              return new DictionaryDecimalIntAccessor<>(
+                  (IntVector) vector,
+                  dictionary,
+                  decimalFactorySupplier.get());
+            default:
+              throw new UnsupportedOperationException(
+                  "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+          }
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported logical type: " + primitive.getOriginalType());
+      }
+    } else {
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new DictionaryBinaryAccessor<>((IntVector) vector, dictionary);
+        case FLOAT:
+          return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
+        case INT64:
+          return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+        case DOUBLE:
+          return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + primitive);
+      }
+    }
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
+      getPlainVectorAccessor(FieldVector vector) {
+    if (vector instanceof BitVector) {
+      return new BooleanAccessor<>((BitVector) vector);
+    } else if (vector instanceof IntVector) {
+      return new IntAccessor<>((IntVector) vector);
+    } else if (vector instanceof BigIntVector) {
+      return new LongAccessor<>((BigIntVector) vector);
+    } else if (vector instanceof Float4Vector) {
+      return new FloatAccessor<>((Float4Vector) vector);
+    } else if (vector instanceof Float8Vector) {
+      return new DoubleAccessor<>((Float8Vector) vector);
+    } else if (vector instanceof DecimalVector) {
+      return new DecimalAccessor<>((DecimalVector) vector, decimalFactorySupplier.get());
+    } else if (vector instanceof VarCharVector) {
+      return new StringAccessor<>((VarCharVector) vector, stringFactorySupplier.get());
+    } else if (vector instanceof VarBinaryVector) {
+      return new BinaryAccessor<>((VarBinaryVector) vector);
+    } else if (vector instanceof DateDayVector) {
+      return new DateAccessor<>((DateDayVector) vector);
+    } else if (vector instanceof TimeStampMicroTZVector) {
+      return new TimestampMicroTzAccessor<>((TimeStampMicroTZVector) vector);
+    } else if (vector instanceof TimeStampMicroVector) {
+      return new TimestampMicroAccessor<>((TimeStampMicroVector) vector);
+    } else if (vector instanceof ListVector) {
+      ListVector listVector = (ListVector) vector;
+      return new ArrayAccessor<>(listVector, arrayFactorySupplier.get());
+    } else if (vector instanceof StructVector) {
+      StructVector structVector = (StructVector) vector;
+      return new StructAccessor<>(structVector, structChildFactorySupplier.get());
+    }
+    throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
+  }
+
+  private static class BooleanAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final BitVector vector;
+
+    BooleanAccessor(BitVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final boolean getBoolean(int rowId) {
+      return vector.get(rowId) == 1;
+    }
+  }
+
+  private static class IntAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final IntVector vector;
+
+    IntAccessor(IntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final int getInt(int rowId) {
+      return vector.get(rowId);
+    }
+
+    @Override
+    public final long getLong(int rowId) {
+      return getInt(rowId);
+    }
+  }
+
+  private static class LongAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final BigIntVector vector;
+
+    LongAccessor(BigIntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryLongAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final IntVector offsetVector;
+    private final long[] decodedDictionary;
+
+    DictionaryLongAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToLong(dictionary::decodeToLong)
+          .toArray();
+    }
+
+    @Override
+    public final long getLong(int rowId) {
+      return decodedDictionary[offsetVector.get(rowId)];
+    }
+  }
+
+  private static class FloatAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final Float4Vector vector;
+
+    FloatAccessor(Float4Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final float getFloat(int rowId) {
+      return vector.get(rowId);
+    }
+
+    @Override
+    public final double getDouble(int rowId) {
+      return getFloat(rowId);
+    }
+  }
+
+  private static class DictionaryFloatAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final IntVector offsetVector;
+    private final float[] decodedDictionary;
+
+    DictionaryFloatAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = new float[dictionary.getMaxId() + 1];
+      for (int i = 0; i <= dictionary.getMaxId(); i++) {
+        decodedDictionary[i] = dictionary.decodeToFloat(i);
+      }
+    }
+
+    @Override
+    public final float getFloat(int rowId) {
+      return decodedDictionary[offsetVector.get(rowId)];
+    }
+
+    @Override
+    public final double getDouble(int rowId) {
+      return getFloat(rowId);
+    }
+  }
+
+  private static class DoubleAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final Float8Vector vector;
+
+    DoubleAccessor(Float8Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final double getDouble(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryDoubleAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final IntVector offsetVector;
+    private final double[] decodedDictionary;
+
+    DictionaryDoubleAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToDouble(dictionary::decodeToDouble)
+          .toArray();
+    }
+
+    @Override
+    public final double getDouble(int rowId) {
+      return decodedDictionary[offsetVector.get(rowId)];
+    }
+  }
+
+  private static class StringAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final VarCharVector vector;
+    private final StringFactory<Utf8StringT> stringFactory;
+
+    StringAccessor(VarCharVector vector, StringFactory<Utf8StringT> stringFactory) {
+      super(vector);
+      this.vector = vector;
+      this.stringFactory = stringFactory;
+    }
+
+    @Override
+    public final Utf8StringT getUTF8String(int rowId) {
+      return stringFactory.ofRow(vector, rowId);
+    }
+  }
+
+  private static class DictionaryStringAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final Utf8StringT[] decodedDictionary;
+    private final IntVector offsetVector;
+
+    DictionaryStringAccessor(IntVector vector, Dictionary dictionary, StringFactory<Utf8StringT> stringFactory) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToObj(dictionary::decodeToBinary)
+          .map(binary -> stringFactory.ofBytes(binary.getBytes()))
+          .toArray(genericArray(stringFactory.getGenericClass()));
+    }
+
+    @Override
+    public final Utf8StringT getUTF8String(int rowId) {
+      int offset = offsetVector.get(rowId);
+      return decodedDictionary[offset];
+    }
+  }
+
+  private static class BinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final VarBinaryVector vector;
+
+    BinaryAccessor(VarBinaryVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final byte[] getBinary(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryBinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final IntVector offsetVector;
+    private final byte[][] decodedDictionary;
+
+    DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+          .mapToObj(dictionary::decodeToBinary)
+          .map(Binary::getBytes)
+          .toArray(byte[][]::new);
+    }
+
+    @Override
+    public final byte[] getBinary(int rowId) {
+      int offset = offsetVector.get(rowId);
+      return decodedDictionary[offset];
+    }
+  }
+
+  private static class DateAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final DateDayVector vector;
+
+    DateAccessor(DateDayVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final int getInt(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class TimestampMicroTzAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final TimeStampMicroTZVector vector;
+
+    TimestampMicroTzAccessor(TimeStampMicroTZVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class TimestampMicroAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+      extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final TimeStampMicroVector vector;
+
+    TimestampMicroAccessor(TimeStampMicroVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class ArrayAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final ListVector vector;
+    private final ChildVectorT arrayData;
+    private final ArrayFactory<ChildVectorT, ArrayT> arrayFactory;
+
+    ArrayAccessor(ListVector vector, ArrayFactory<ChildVectorT, ArrayT> arrayFactory) {
+      super(vector);
+      this.vector = vector;
+      this.arrayFactory = arrayFactory;
+      this.arrayData = arrayFactory.ofChild(vector.getDataVector());
+    }
+
+    @Override
+    public final ArrayT getArray(int rowId) {
+      return arrayFactory.ofRow(vector, arrayData, rowId);
+    }
+  }
+
+  private static class StructAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    StructAccessor(StructVector structVector, StructChildFactory<ChildVectorT> structChildFactory) {
+      super(structVector, IntStream.range(0, structVector.size())
+              .mapToObj(structVector::getVectorById)
+              .map(structChildFactory::of)
+              .toArray(genericArray(structChildFactory.getGenericClass())));
+    }
+  }
+
+  private static class DecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    private final DecimalVector vector;
+    private final DecimalFactory<DecimalT> decimalFactory;
+
+    DecimalAccessor(DecimalVector vector, DecimalFactory<DecimalT> decimalFactory) {
+      super(vector);
+      this.vector = vector;
+      this.decimalFactory = decimalFactory;
+    }
+
+    @Override
+    public final DecimalT getDecimal(int rowId, int precision, int scale) {
+      return decimalFactory.ofBigDecimal(
+              DecimalUtility.getBigDecimalFromArrowBuf(vector.getDataBuffer(), rowId, scale),
+              precision, scale);
+    }
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  private abstract static class
+      DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final DecimalT[] cache;
+    private final DecimalFactory<DecimalT> decimalFactory;
+    private final Dictionary parquetDictionary;
+    private final IntVector offsetVector;
+
+    private DictionaryDecimalAccessor(
+            IntVector vector,
+            Dictionary dictionary,
+            DecimalFactory<DecimalT> decimalFactory) {
+      super(vector);
+      this.offsetVector = vector;
+      this.parquetDictionary = dictionary;
+      this.decimalFactory = decimalFactory;
+      this.cache = genericArray(decimalFactory.getGenericClass(), dictionary.getMaxId() + 1);
+    }
+
+    protected long decodeToBinary(int dictId) {
+      return new BigInteger(parquetDictionary.decodeToBinary(dictId).getBytes()).longValue();
+    }
+
+    protected long decodeToLong(int dictId) {
+      return parquetDictionary.decodeToLong(dictId);
+    }
+
+    protected int decodeToInt(int dictId) {
+      return parquetDictionary.decodeToInt(dictId);
+    }
+
+    @Override
+    public final DecimalT getDecimal(int rowId, int precision, int scale) {
+      int dictId = offsetVector.get(rowId);
+      if (cache[dictId] == null) {
+        cache[dictId] = decimalFactory.ofLong(
+            decode(dictId),
+            precision,
+            scale);
+      }
+      return cache[dictId];
+    }
+
+    protected abstract long decode(int dictId);
+  }
+
+  private static class
+      DictionaryDecimalBinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    DictionaryDecimalBinaryAccessor(IntVector vector, Dictionary dictionary, DecimalFactory<DecimalT> decimalFactory) {
+      super(vector, dictionary, decimalFactory);
+    }
+
+    @Override
+    protected long decode(int dictId) {
+      return decodeToBinary(dictId);
+    }
+  }
+
+  private static class DictionaryDecimalLongAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    DictionaryDecimalLongAccessor(IntVector vector, Dictionary dictionary, DecimalFactory<DecimalT> decimalFactory) {
+      super(vector, dictionary, decimalFactory);
+    }
+
+    @Override
+    protected long decode(int dictId) {
+      return decodeToLong(dictId);
+    }
+  }
+
+  private static class DictionaryDecimalIntAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+          extends DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+    DictionaryDecimalIntAccessor(IntVector vector, Dictionary dictionary, DecimalFactory<DecimalT> decimalFactory) {
+      super(vector, dictionary, decimalFactory);
+    }
+
+    @Override
+    protected long decode(int dictId) {
+      return decodeToInt(dictId);
+    }
+  }
+
+  /**
+   * Create a decimal value of type {@code DecimalT} from arrow vector value.
+   * @param <DecimalT> A concrete type that can represent a decimal, e.g, Spark's Decimal.
+   */
+  protected interface DecimalFactory<DecimalT> {
+    /**
+     * Class of concrete decimal type.
+     */
+    Class<DecimalT> getGenericClass();
+
+    /**
+     * Create a decimal from the given long value, precision and scale.
+     */
+    DecimalT ofLong(long value, int precision, int scale);
+
+    /**
+     * Create a decimal from the given {@link BigDecimal} value, precision and scale.
+     */
+    DecimalT ofBigDecimal(BigDecimal value, int precision, int scale);
+  }
+
+  /**
+   * Create a UTF8 String value of type {@code Utf8StringT} from arrow vector value.
+   * @param <Utf8StringT> A concrete type that can represent a UTF8 string.
+   */
+  protected interface StringFactory<Utf8StringT> {
+    /**
+     * Class of concrete UTF8 String type.
+     */
+    Class<Utf8StringT> getGenericClass();
+
+    /**
+     * Create a UTF8 String from the row value in the arrow vector.
+     */
+    Utf8StringT ofRow(VarCharVector vector, int rowId);
+
+    /**
+     * Create a UTF8 String from the byte array.
+     */
+    Utf8StringT ofBytes(byte[] bytes);
+  }
+
+  /**
+   * Create an array value of type {@code ArrayT} from arrow vector value.
+   * @param <ArrayT> A concrete type that can represent an array value in a list vector,
+   *                e.g. Spark's ColumnarArray.
+   * @param <ChildVectorT> A concrete type that can represent a child vector in a struct,
+   *                     e.g. Spark's ArrowColumnVector.
+   */
+  protected interface ArrayFactory<ChildVectorT, ArrayT> {
+    /**
+     * Create a child vector of type {@code ChildVectorT} from the arrow child vector.
+     */
+    ChildVectorT ofChild(ValueVector childVector);
+
+    /**
+     * Create an Arrow of type {@code ArrayT} from the row value in the arrow child vector.
+     */
+    ArrayT ofRow(ValueVector vector, ChildVectorT childData, int rowId);
+  }
+
+  /**
+   * Create a struct child vector of type {@code ChildVectorT} from arrow vector value.
+   * @param <ChildVectorT> A concrete type that can represent a child vector in a struct,
+   *                     e.g. Spark's ArrowColumnVector.
+   */
+  protected interface StructChildFactory<ChildVectorT> {
+    /**
+     * Class of concrete child vector type.
+     */
+    Class<ChildVectorT> getGenericClass();
+
+    /**
+     * Create the child vector of type such as Spark's ArrowColumnVector from the arrow child vector.
+     */
+    ChildVectorT of(ValueVector childVector);
+  }
+
+  private static <T> IntFunction<T[]> genericArray(Class<T> genericClass) {
+    return length -> genericArray(genericClass, length);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T[] genericArray(Class<T> genericClass, int length) {
+    return (T[]) Array.newInstance(genericClass, length);
+  }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 6c97c27..5ed323f 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -31,6 +31,7 @@ import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
@@ -231,7 +232,11 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
             break;
           case TIMESTAMP_MICROS:
             this.vec = arrowField.createVector(rootAlloc);
-            ((TimeStampMicroTZVector) vec).allocateNew(batchSize);
+            if (((Types.TimestampType) icebergField.type()).shouldAdjustToUTC()) {
+              ((TimeStampMicroTZVector) vec).allocateNew(batchSize);
+            } else {
+              ((TimeStampMicroVector) vec).allocateNew(batchSize);
+            }
             this.readType = ReadType.LONG;
             this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
             break;
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
new file mode 100644
index 0000000..77bd089
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.ArrowAllocation;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
+  private final MessageType parquetSchema;
+  private final Schema icebergSchema;
+  private final BufferAllocator rootAllocator;
+  private final Map<Integer, ?> idToConstant;
+  private final boolean setArrowValidityVector;
+  private final Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory;
+
+  public VectorizedReaderBuilder(
+      Schema expectedSchema,
+      MessageType parquetSchema,
+      boolean setArrowValidityVector, Map<Integer, ?> idToConstant,
+      Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory) {
+    this.parquetSchema = parquetSchema;
+    this.icebergSchema = expectedSchema;
+    this.rootAllocator = ArrowAllocation.rootAllocator()
+        .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
+    this.setArrowValidityVector = setArrowValidityVector;
+    this.idToConstant = idToConstant;
+    this.readerFactory = readerFactory;
+  }
+
+  @Override
+  public VectorizedReader<?> message(
+      Types.StructType expected, MessageType message,
+      List<VectorizedReader<?>> fieldReaders) {
+    GroupType groupType = message.asGroupType();
+    Map<Integer, VectorizedReader<?>> readersById = Maps.newHashMap();
+    List<Type> fields = groupType.getFields();
+
+    IntStream.range(0, fields.size())
+        .filter(pos -> fields.get(pos).getId() != null)
+        .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos)));
+
+    List<Types.NestedField> icebergFields = expected != null ?
+        expected.fields() : ImmutableList.of();
+
+    List<VectorizedReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
+        icebergFields.size());
+
+    for (Types.NestedField field : icebergFields) {
+      int id = field.fieldId();
+      VectorizedReader<?> reader = readersById.get(id);
+      if (idToConstant.containsKey(id)) {
+        reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(idToConstant.get(id)));
+      } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
+        reorderedFields.add(VectorizedArrowReader.positions());
+      } else if (reader != null) {
+        reorderedFields.add(reader);
+      } else {
+        reorderedFields.add(VectorizedArrowReader.nulls());
+      }
+    }
+    return readerFactory.apply(reorderedFields);
+  }
+
+  @Override
+  public VectorizedReader<?> struct(
+      Types.StructType expected, GroupType groupType,
+      List<VectorizedReader<?>> fieldReaders) {
+    if (expected != null) {
+      throw new UnsupportedOperationException("Vectorized reads are not supported yet for struct fields");
+    }
+    return null;
+  }
+
+  @Override
+  public VectorizedReader<?> primitive(
+      org.apache.iceberg.types.Type.PrimitiveType expected,
+      PrimitiveType primitive) {
+
+    // Create arrow vector for this field
+    if (primitive.getId() == null) {
+      return null;
+    }
+    int parquetFieldId = primitive.getId().intValue();
+    ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
+    // Nested types not yet supported for vectorized reads
+    if (desc.getMaxRepetitionLevel() > 0) {
+      return null;
+    }
+    Types.NestedField icebergField = icebergSchema.findField(parquetFieldId);
+    if (icebergField == null) {
+      return null;
+    }
+    // Set the validity buffer if null checking is enabled in arrow
+    return new VectorizedArrowReader(desc, icebergField, rootAllocator, setArrowValidityVector);
+  }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedTableScanIterable.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedTableScanIterable.java
new file mode 100644
index 0000000..e5a81e8
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedTableScanIterable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * A vectorized implementation of the Iceberg reader that iterates over the table scan.
+ * See {@link ArrowReader} for details.
+ */
+public class VectorizedTableScanIterable extends CloseableGroup implements CloseableIterable<ColumnarBatch> {
+
+  private static final int BATCH_SIZE_IN_NUM_ROWS = 1 << 16;
+
+  private final ArrowReader reader;
+  private final CloseableIterable<CombinedScanTask> tasks;
+
+  /**
+   * Create a new instance using default values for {@code batchSize} and {@code reuseContainers}.
+   * The {@code batchSize} is set to {@link #BATCH_SIZE_IN_NUM_ROWS} and {@code reuseContainers}
+   * is set to {@code false}.
+   *
+   */
+  public VectorizedTableScanIterable(TableScan scan) {
+    this(scan, BATCH_SIZE_IN_NUM_ROWS, false);
+  }
+
+  /**
+   * Create a new instance.
+   *
+   * See {@link ArrowReader#ArrowReader(TableScan, int, boolean)} for details.
+   */
+  public VectorizedTableScanIterable(TableScan scan, int batchSize, boolean reuseContainers) {
+    this.reader = new ArrowReader(scan, batchSize, reuseContainers);
+    // start planning tasks in the background
+    this.tasks = scan.planTasks();
+  }
+
+  @Override
+  public CloseableIterator<ColumnarBatch> iterator() {
+    CloseableIterator<ColumnarBatch> iter = reader.open(tasks);
+    addCloseable(iter);
+    return iter;
+  }
+
+  @Override
+  public void close() throws IOException {
+    tasks.close(); // close manifests from scan planning
+    super.close(); // close data files
+  }
+}
diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
new file mode 100644
index 0000000..6829173
--- /dev/null
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link ArrowReader}.
+ * <p>All tests create a table with monthly partitions and write 1 year of data to the table.
+ */
+public class ArrowReaderTest {
+
+  private static final int NUM_ROWS_PER_MONTH = 20;
+  private static final ImmutableList<String> ALL_COLUMNS =
+      ImmutableList.of(
+          "timestamp",
+          "timestamp_nullable",
+          "boolean",
+          "boolean_nullable",
+          "int",
+          "int_nullable",
+          "long",
+          "long_nullable",
+          "float",
+          "float_nullable",
+          "double",
+          "double_nullable",
+          "timestamp_tz",
+          "timestamp_tz_nullable",
+          "string",
+          "string_nullable",
+          "bytes",
+          "bytes_nullable",
+          "date",
+          "date_nullable",
+          "int_promotion"
+      );
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  private HadoopTables tables;
+
+  private String tableLocation;
+  private List<GenericRecord> rowsWritten;
+
+  /**
+   * Read all rows and columns from the table without any filter. The test asserts that the Arrow {@link
+   * VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the vectors
+   * contains expected values. The test also asserts the total number of rows match the expected value.
+   */
+  @Test
+  public void testReadAll() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    readAndCheckQueryResult(table.newScan(), NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * This test writes each partition with constant value rows. The Arrow vectors returned are mostly of type int32
+   * which is unexpected. This is happening because of dictionary encoding at the storage level.
+   * <p>
+   * Following are the expected and actual Arrow schema:
+   * <pre>
+   * Expected Arrow Schema:
+   * timestamp: Timestamp(MICROSECOND, null) not null,
+   * timestamp_nullable: Timestamp(MICROSECOND, null),
+   * boolean: Bool not null,
+   * boolean_nullable: Bool,
+   * int: Int(32, true) not null,
+   * int_nullable: Int(32, true),
+   * long: Int(64, true) not null,
+   * long_nullable: Int(64, true),
+   * float: FloatingPoint(SINGLE) not null,
+   * float_nullable: FloatingPoint(SINGLE),
+   * double: FloatingPoint(DOUBLE) not null,
+   * double_nullable: FloatingPoint(DOUBLE),
+   * timestamp_tz: Timestamp(MICROSECOND, UTC) not null,
+   * timestamp_tz_nullable: Timestamp(MICROSECOND, UTC),
+   * string: Utf8 not null,
+   * string_nullable: Utf8,
+   * bytes: Binary not null,
+   * bytes_nullable: Binary,
+   * date: Date(DAY) not null,
+   * date_nullable: Date(DAY),
+   * int_promotion: Int(32, true) not null
+   *
+   * Actual Arrow Schema:
+   * timestamp: Int(32, true) not null,
+   * timestamp_nullable: Int(32, true),
+   * boolean: Bool not null,
+   * boolean_nullable: Bool,
+   * int: Int(32, true) not null,
+   * int_nullable: Int(32, true),
+   * long: Int(32, true) not null,
+   * long_nullable: Int(32, true),
+   * float: Int(32, true) not null,
+   * float_nullable: Int(32, true),
+   * double: Int(32, true) not null,
+   * double_nullable: Int(32, true),
+   * timestamp_tz: Int(32, true) not null,
+   * timestamp_tz_nullable: Int(32, true),
+   * string: Int(32, true) not null,
+   * string_nullable: Int(32, true),
+   * bytes: Int(32, true) not null,
+   * bytes_nullable: Int(32, true),
+   * date: Date(DAY) not null,
+   * date_nullable: Date(DAY),
+   * int_promotion: Int(32, true) not null
+   * </pre>
+   * <p>
+   * TODO: fix the returned Arrow vectors to have vector types consistent with Iceberg types.
+   * <p>
+   * Read all rows and columns from the table without any filter. The test asserts that the Arrow {@link
+   * VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the vectors
+   * contains expected values. The test also asserts the total number of rows match the expected value.
+   */
+  @Test
+  @Ignore
+  public void testReadAllWithConstantRecords() throws Exception {
+    writeTableWithConstantRecords();
+    Table table = tables.load(tableLocation);
+    readAndCheckQueryResult(table.newScan(), NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * Read all rows and columns from the table without any filter. The test uses a batch size smaller than the number of
+   * rows in a partition. The test asserts that the Arrow {@link VectorSchemaRoot} contains the expected schema and
+   * expected vector types. Then the test asserts that the vectors contains expected values. The test also asserts the
+   * total number of rows match the expected value.
+   */
+  @Test
+  public void testReadAllWithSmallerBatchSize() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan();
+    readAndCheckQueryResult(scan, 10, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * Read selected rows and all columns from the table using a time range row filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number of rows match the expected value.
+   */
+  @Test
+  public void testReadRangeFilter() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    LocalDateTime beginTime = LocalDateTime.of(2020, 1, 1, 0, 0, 0);
+    LocalDateTime endTime = LocalDateTime.of(2020, 2, 1, 0, 0, 0);
+    TableScan scan = table.newScan()
+        .filter(Expressions.and(
+            Expressions.greaterThanOrEqual("timestamp", timestampToMicros(beginTime)),
+            Expressions.lessThan("timestamp", timestampToMicros(endTime))));
+    readAndCheckQueryResult(scan, NUM_ROWS_PER_MONTH, NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * Read selected rows and all columns from the table using a time range row filter.
+   * The test asserts that the result is empty.
+   */
+  @Test
+  public void testReadRangeFilterEmptyResult() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    LocalDateTime beginTime = LocalDateTime.of(2021, 1, 1, 0, 0, 0);
+    LocalDateTime endTime = LocalDateTime.of(2021, 2, 1, 0, 0, 0);
+    TableScan scan = table.newScan()
+            .filter(Expressions.and(
+                    Expressions.greaterThanOrEqual("timestamp", timestampToMicros(beginTime)),
+                    Expressions.lessThan("timestamp", timestampToMicros(endTime))));
+    int numRoots = 0;
+    try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, NUM_ROWS_PER_MONTH, false)) {
+      for (ColumnarBatch batch : itr) {
+        numRoots++;
+      }
+    }
+    assertEquals(0, numRoots);
+  }
+
+  /**
+   * Read all rows and selected columns from the table with a column selection filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number of rows match the expected value.
+   */
+  @Test
+  public void testReadColumnFilter1() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan()
+        .select("timestamp", "int", "string");
+    readAndCheckQueryResult(
+        scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+        ImmutableList.of("timestamp", "int", "string"));
+  }
+
+  /**
+   * Read all rows and a single column from the table with a column selection filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number of rows match the expected value.
+   */
+  @Test
+  public void testReadColumnFilter2() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan()
+        .select("timestamp");
+    readAndCheckQueryResult(
+        scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+        ImmutableList.of("timestamp"));
+  }
+
+  /**
+   * Run the following verifications:
+   * <ol>
+   *   <li>Read the data and verify that the returned ColumnarBatches match expected rows.</li>
+   *   <li>Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows.</li>
+   * </ol>
+   */
+  private void readAndCheckQueryResult(
+      TableScan scan,
+      int numRowsPerRoot,
+      int expectedTotalRows,
+      List<String> columns) throws IOException {
+    // Read the data and verify that the returned ColumnarBatches match expected rows.
+    readAndCheckColumnarBatch(scan, numRowsPerRoot, columns);
+    // Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows.
+    readAndCheckArrowResult(scan, numRowsPerRoot, expectedTotalRows, columns);
+  }
+
+  private void readAndCheckColumnarBatch(
+      TableScan scan,
+      int numRowsPerRoot,
+      List<String> columns) throws IOException {
+    int rowIndex = 0;
+    try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, numRowsPerRoot, false)) {
+      for (ColumnarBatch batch : itr) {
+        List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + numRowsPerRoot);
+        checkColumnarBatch(numRowsPerRoot, expectedRows, batch, columns);
+        rowIndex += numRowsPerRoot;
+      }
+    }
+  }
+
+  private void readAndCheckArrowResult(
+      TableScan scan,
+      int numRowsPerRoot,
+      int expectedTotalRows,
+      List<String> columns) throws IOException {
+    Set<String> columnSet = ImmutableSet.copyOf(columns);
+    int rowIndex = 0;
+    int totalRows = 0;
+    try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, numRowsPerRoot, false)) {
+      for (ColumnarBatch batch : itr) {
+        List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + numRowsPerRoot);
+        VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
+        assertEquals(createExpectedArrowSchema(columnSet), root.getSchema());
+        checkAllVectorTypes(root, columnSet);
+        checkAllVectorValues(numRowsPerRoot, expectedRows, root, columnSet);
+        rowIndex += numRowsPerRoot;
+        totalRows += root.getRowCount();
+      }
+    }
+    assertEquals(expectedTotalRows, totalRows);
+  }
+
+  private void checkColumnarBatch(
+      int expectedNumRows,
+      List<GenericRecord> expectedRows,
+      ColumnarBatch batch,
+      List<String> columns) {
+
+    Map<String, Integer> columnNameToIndex = new HashMap<>();
+    for (int i = 0; i < columns.size(); i++) {
+      columnNameToIndex.put(columns.get(i), i);
+    }
+    Set<String> columnSet = columnNameToIndex.keySet();
+
+    assertEquals(expectedNumRows, batch.numRows());
+    assertEquals(columns.size(), batch.numCols());
+
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp"),
+        columnSet, "timestamp",
+        (records, i) -> records.get(i).getField("timestamp"),
+        (array, i) -> timestampFromMicros(array.getLong(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp_nullable"),
+        columnSet, "timestamp_nullable",
+        (records, i) -> records.get(i).getField("timestamp_nullable"),
+        (array, i) -> timestampFromMicros(array.getLong(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("boolean"),
+        columnSet, "boolean",
+        (records, i) -> records.get(i).getField("boolean"),
+        ColumnVector::getBoolean
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("boolean_nullable"),
+        columnSet, "boolean_nullable",
+        (records, i) -> records.get(i).getField("boolean_nullable"),
+        ColumnVector::getBoolean
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("int"),
+        columnSet, "int",
+        (records, i) -> records.get(i).getField("int"),
+        ColumnVector::getInt
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("int_nullable"),
+        columnSet, "int_nullable",
+        (records, i) -> records.get(i).getField("int_nullable"),
+        ColumnVector::getInt
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("long"),
+        columnSet, "long",
+        (records, i) -> records.get(i).getField("long"),
+        ColumnVector::getLong
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("long_nullable"),
+        columnSet, "long_nullable",
+        (records, i) -> records.get(i).getField("long_nullable"),
+        ColumnVector::getLong
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("float"),
+        columnSet, "float",
+        (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float")),
+        (array, i) -> Float.floatToIntBits(array.getFloat(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("float_nullable"),
+        columnSet, "float_nullable",
+        (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float_nullable")),
+        (array, i) -> Float.floatToIntBits(array.getFloat(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("double"),
+        columnSet, "double",
+        (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double")),
+        (array, i) -> Double.doubleToLongBits(array.getDouble(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("double_nullable"),
+        columnSet, "double_nullable",
+        (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double_nullable")),
+        (array, i) -> Double.doubleToLongBits(array.getDouble(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp_tz"),
+        columnSet, "timestamp_tz",
+        (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz")),
+        ColumnVector::getLong
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp_tz_nullable"),
+        columnSet, "timestamp_tz_nullable",
+        (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz_nullable")),
+        ColumnVector::getLong
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("string"),
+        columnSet, "string",
+        (records, i) -> records.get(i).getField("string"),
+        ColumnVector::getString
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("string_nullable"),
+        columnSet, "string_nullable",
+        (records, i) -> records.get(i).getField("string_nullable"),
+        ColumnVector::getString
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("bytes"),
+        columnSet, "bytes",
+        (records, i) -> records.get(i).getField("bytes"),
+        (array, i) -> ByteBuffer.wrap(array.getBinary(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("bytes_nullable"),
+        columnSet, "bytes_nullable",
+        (records, i) -> records.get(i).getField("bytes_nullable"),
+        (array, i) -> ByteBuffer.wrap(array.getBinary(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("date"),
+        columnSet, "date",
+        (records, i) -> records.get(i).getField("date"),
+        (array, i) -> dateFromDay(array.getInt(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("date_nullable"),
+        columnSet, "date_nullable",
+        (records, i) -> records.get(i).getField("date_nullable"),
+        (array, i) -> dateFromDay(array.getInt(i))
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("int_promotion"),
+        columnSet, "int_promotion",
+        (records, i) -> records.get(i).getField("int_promotion"),
+        ColumnVector::getInt
+    );
+  }
+
+  private static void checkColumnarArrayValues(
+      int expectedNumRows,
+      List<GenericRecord> expectedRows,
+      ColumnarBatch columnBatch,
+      Integer columnIndex,
+      Set<String> columnSet,
+      String columnName,
+      BiFunction<List<GenericRecord>, Integer, Object> expectedValueExtractor,
+      BiFunction<ColumnVector, Integer, Object> vectorValueExtractor) {
+    if (columnSet.contains(columnName)) {
+      ColumnVector columnVector = columnBatch.column(columnIndex);
+      for (int i = 0; i < expectedNumRows; i++) {
+        Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
+        Object actualValue = vectorValueExtractor.apply(columnVector, i);
+        assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+      }
+    }
+  }
+
+  private void writeTableWithConstantRecords() throws Exception {
+    writeTable(true);
+  }
+
+  private void writeTableWithIncrementalRecords() throws Exception {
+    writeTable(false);
+  }
+
+  private void writeTable(boolean constantRecords) throws Exception {
+    rowsWritten = new ArrayList<>();
+    tables = new HadoopTables();
+    tableLocation = temp.newFolder("test").toString();
+
+    Schema schema = new Schema(
+        Types.NestedField.required(1, "timestamp", Types.TimestampType.withoutZone()),
+        Types.NestedField.optional(2, "timestamp_nullable", Types.TimestampType.withoutZone()),
+        Types.NestedField.required(3, "boolean", Types.BooleanType.get()),
+        Types.NestedField.optional(4, "boolean_nullable", Types.BooleanType.get()),
+        Types.NestedField.required(5, "int", Types.IntegerType.get()),
+        Types.NestedField.optional(6, "int_nullable", Types.IntegerType.get()),
+        Types.NestedField.required(7, "long", Types.LongType.get()),
+        Types.NestedField.optional(8, "long_nullable", Types.LongType.get()),
+        Types.NestedField.required(9, "float", Types.FloatType.get()),
+        Types.NestedField.optional(10, "float_nullable", Types.FloatType.get()),
+        Types.NestedField.required(11, "double", Types.DoubleType.get()),
+        Types.NestedField.optional(12, "double_nullable", Types.DoubleType.get()),
+        Types.NestedField.required(13, "timestamp_tz", Types.TimestampType.withZone()),
+        Types.NestedField.optional(14, "timestamp_tz_nullable", Types.TimestampType.withZone()),
+        Types.NestedField.required(15, "string", Types.StringType.get()),
+        Types.NestedField.optional(16, "string_nullable", Types.StringType.get()),
+        Types.NestedField.required(17, "bytes", Types.BinaryType.get()),
+        Types.NestedField.optional(18, "bytes_nullable", Types.BinaryType.get()),
+        Types.NestedField.required(19, "date", Types.DateType.get()),
+        Types.NestedField.optional(20, "date_nullable", Types.DateType.get()),
+        Types.NestedField.required(21, "int_promotion", Types.IntegerType.get())
+    );
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema)
+        .month("timestamp")
+        .build();
+
+    Table table = tables.create(schema, spec, tableLocation);
+
+    OverwriteFiles overwrite = table.newOverwrite();
+    for (int i = 1; i <= 12; i++) {
+      final List<GenericRecord> records;
+      if (constantRecords) {
+        records = createConstantRecordsForDate(
+            table.schema(), LocalDateTime.of(2020, i, 1, 0, 0, 0)
+        );
+      } else {
+        records = createIncrementalRecordsForDate(
+            table.schema(), LocalDateTime.of(2020, i, 1, 0, 0, 0)
+        );
+      }
+      overwrite.addFile(writeParquetFile(table, records));
+    }
+    overwrite.commit();
+
+    // Perform a type promotion
+    // TODO: The read Arrow vector should of type BigInt (promoted type) but it is Int (old type).
+    Table tableLatest = tables.load(tableLocation);
+    tableLatest.updateSchema()
+        .updateColumn("int_promotion", Types.LongType.get())
+        .commit();
+  }
+
+  private static org.apache.arrow.vector.types.pojo.Schema createExpectedArrowSchema(Set<String> columnSet) {
+    List<Field> allFields = ImmutableList.of(
+        new Field(
+            "timestamp", new FieldType(false, MinorType.TIMESTAMPMICRO.getType(), null), null),
+        new Field(
+            "timestamp_nullable", new FieldType(true, MinorType.TIMESTAMPMICRO.getType(), null), null),
+        new Field(
+            "boolean", new FieldType(false, MinorType.BIT.getType(), null), null),
+        new Field(
+            "boolean_nullable", new FieldType(true, MinorType.BIT.getType(), null), null),
+        new Field(
+            "int", new FieldType(false, MinorType.INT.getType(), null), null),
+        new Field(
+            "int_nullable", new FieldType(true, MinorType.INT.getType(), null), null),
+        new Field(
+            "long", new FieldType(false, MinorType.BIGINT.getType(), null), null),
+        new Field(
+            "long_nullable", new FieldType(true, MinorType.BIGINT.getType(), null), null),
+        new Field(
+            "float", new FieldType(false, MinorType.FLOAT4.getType(), null), null),
+        new Field(
+            "float_nullable", new FieldType(true, MinorType.FLOAT4.getType(), null), null),
+        new Field(
+            "double", new FieldType(false, MinorType.FLOAT8.getType(), null), null),
+        new Field(
+            "double_nullable", new FieldType(true, MinorType.FLOAT8.getType(), null), null),
+        new Field(
+            "timestamp_tz", new FieldType(false, new ArrowType.Timestamp(
+                org.apache.arrow.vector.types.TimeUnit.MICROSECOND, "UTC"), null), null),
+        new Field(
+            "timestamp_tz_nullable", new FieldType(true, new ArrowType.Timestamp(
+                org.apache.arrow.vector.types.TimeUnit.MICROSECOND, "UTC"), null), null),
+        new Field(
+            "string", new FieldType(false, MinorType.VARCHAR.getType(), null), null),
+        new Field(
+            "string_nullable", new FieldType(true, MinorType.VARCHAR.getType(), null), null),
+        new Field(
+            "bytes", new FieldType(false, MinorType.VARBINARY.getType(), null), null),
+        new Field(
+            "bytes_nullable", new FieldType(true, MinorType.VARBINARY.getType(), null), null),
+        new Field(
+            "date", new FieldType(false, MinorType.DATEDAY.getType(), null), null),
+        new Field(
+            "date_nullable", new FieldType(true, MinorType.DATEDAY.getType(), null), null),
+        new Field(
+            "int_promotion", new FieldType(false, MinorType.INT.getType(), null), null)
+    );
+    List<Field> filteredFields = allFields.stream()
+        .filter(f -> columnSet.contains(f.getName()))
+        .collect(Collectors.toList());
+    return new org.apache.arrow.vector.types.pojo.Schema(filteredFields);
+  }
+
+  private List<GenericRecord> createIncrementalRecordsForDate(Schema schema, LocalDateTime datetime) {
+    List<GenericRecord> records = new ArrayList<>();
+    for (int i = 0; i < NUM_ROWS_PER_MONTH; i++) {
+      GenericRecord rec = GenericRecord.create(schema);
+      rec.setField("timestamp", datetime.plus(i, ChronoUnit.DAYS));
+      rec.setField("timestamp_nullable", datetime.plus(i, ChronoUnit.DAYS));
+      rec.setField("boolean", i % 2 == 0);
+      rec.setField("boolean_nullable", i % 2 == 0);
+      rec.setField("int", i);
+      rec.setField("int_nullable", i);
+      rec.setField("long", (long) i * 2);
+      rec.setField("long_nullable", (long) i * 2);
+      rec.setField("float", (float) i * 3);
+      rec.setField("float_nullable", (float) i * 3);
+      rec.setField("double", (double) i * 4);
+      rec.setField("double_nullable", (double) i * 4);
+      rec.setField("timestamp_tz", datetime.plus(i, ChronoUnit.MINUTES).atOffset(ZoneOffset.UTC));
+      rec.setField("timestamp_tz_nullable", datetime.plus(i, ChronoUnit.MINUTES).atOffset(ZoneOffset.UTC));
+      rec.setField("string", "String-" + i);
+      rec.setField("string_nullable", "String-" + i);
+      rec.setField("bytes", ByteBuffer.wrap(("Bytes-" + i).getBytes(StandardCharsets.UTF_8)));
+      rec.setField("bytes_nullable", ByteBuffer.wrap(("Bytes-" + i).getBytes(StandardCharsets.UTF_8)));
+      rec.setField("date", LocalDate.of(2020, 1, 1).plus(i, ChronoUnit.DAYS));
+      rec.setField("date_nullable", LocalDate.of(2020, 1, 1).plus(i, ChronoUnit.DAYS));
+      rec.setField("int_promotion", i);
+      records.add(rec);
+    }
+    return records;
+  }
+
+  private List<GenericRecord> createConstantRecordsForDate(Schema schema, LocalDateTime datetime) {
+    List<GenericRecord> records = new ArrayList<>();
+    for (int i = 0; i < NUM_ROWS_PER_MONTH; i++) {
+      GenericRecord rec = GenericRecord.create(schema);
+      rec.setField("timestamp", datetime);
+      rec.setField("timestamp_nullable", datetime);
+      rec.setField("boolean", true);
+      rec.setField("boolean_nullable", true);
+      rec.setField("int", 1);
+      rec.setField("int_nullable", 1);
+      rec.setField("long", 2L);
+      rec.setField("long_nullable", 2L);
+      rec.setField("float", 3.0f);
+      rec.setField("float_nullable", 3.0f);
+      rec.setField("double", 4.0);
+      rec.setField("double_nullable", 4.0);
+      rec.setField("timestamp_tz", datetime.atOffset(ZoneOffset.UTC));
+      rec.setField("timestamp_tz_nullable", datetime.atOffset(ZoneOffset.UTC));
+      rec.setField("string", "String");
+      rec.setField("string_nullable", "String");
+      rec.setField("bytes", ByteBuffer.wrap("Bytes".getBytes(StandardCharsets.UTF_8)));
+      rec.setField("bytes_nullable", ByteBuffer.wrap("Bytes".getBytes(StandardCharsets.UTF_8)));
+      rec.setField("date", LocalDate.of(2020, 1, 1));
+      rec.setField("date_nullable", LocalDate.of(2020, 1, 1));
+      rec.setField("int_promotion", 1);
+      records.add(rec);
+    }
+    return records;
+  }
+
+  private DataFile writeParquetFile(Table table, List<GenericRecord> records) throws IOException {
+    rowsWritten.addAll(records);
+    File parquetFile = temp.newFile();
+    assertTrue(parquetFile.delete());
+    FileAppender<GenericRecord> appender = Parquet.write(Files.localOutput(parquetFile))
+        .schema(table.schema())
+        .createWriterFunc(GenericParquetWriter::buildWriter)
+        .build();
+    try {
+      appender.addAll(records);
+    } finally {
+      appender.close();
+    }
+
+    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+    partitionKey.partition(new LocalDateTimeToLongMicros(records.get(0)));
+
+    return DataFiles.builder(table.spec())
+        .withPartition(partitionKey)
+        .withInputFile(localInput(parquetFile))
+        .withMetrics(appender.metrics())
+        .withFormat(FileFormat.PARQUET)
+        .build();
+  }
+
+  private static long timestampToMicros(LocalDateTime value) {
+    Instant instant = value.toInstant(ZoneOffset.UTC);
+    return ChronoUnit.MICROS.between(Instant.EPOCH, instant);
+  }
+
+  private static long timestampToMicros(OffsetDateTime value) {
+    Instant instant = value.toInstant();
+    return ChronoUnit.MICROS.between(Instant.EPOCH, instant);
+  }
+
+  private static LocalDateTime timestampFromMicros(long micros) {
+    return LocalDateTime.ofEpochSecond(
+        TimeUnit.MICROSECONDS.toSeconds(micros),
+        (int) TimeUnit.MICROSECONDS.toNanos(micros % 1000),
+        ZoneOffset.UTC
+    );
+  }
+
+  private static LocalDate dateFromDay(int day) {
+    return LocalDate.ofEpochDay(day);
+  }
+
+  private void checkAllVectorTypes(VectorSchemaRoot root, Set<String> columnSet) {
+    assertEqualsForField(root, columnSet, "timestamp", TimeStampMicroVector.class);
+    assertEqualsForField(root, columnSet, "timestamp_nullable", TimeStampMicroVector.class);
+    assertEqualsForField(root, columnSet, "boolean", BitVector.class);
+    assertEqualsForField(root, columnSet, "boolean_nullable", BitVector.class);
+    assertEqualsForField(root, columnSet, "int", IntVector.class);
+    assertEqualsForField(root, columnSet, "int_nullable", IntVector.class);
+    assertEqualsForField(root, columnSet, "long", BigIntVector.class);
+    assertEqualsForField(root, columnSet, "long_nullable", BigIntVector.class);
+    assertEqualsForField(root, columnSet, "float", Float4Vector.class);
+    assertEqualsForField(root, columnSet, "float_nullable", Float4Vector.class);
+    assertEqualsForField(root, columnSet, "double", Float8Vector.class);
+    assertEqualsForField(root, columnSet, "double_nullable", Float8Vector.class);
+    assertEqualsForField(root, columnSet, "timestamp_tz", TimeStampMicroTZVector.class);
+    assertEqualsForField(root, columnSet, "timestamp_tz_nullable", TimeStampMicroTZVector.class);
+    assertEqualsForField(root, columnSet, "string", VarCharVector.class);
+    assertEqualsForField(root, columnSet, "string_nullable", VarCharVector.class);
+    assertEqualsForField(root, columnSet, "bytes", VarBinaryVector.class);
+    assertEqualsForField(root, columnSet, "bytes_nullable", VarBinaryVector.class);
+    assertEqualsForField(root, columnSet, "date", DateDayVector.class);
+    assertEqualsForField(root, columnSet, "date_nullable", DateDayVector.class);
+    assertEqualsForField(root, columnSet, "int_promotion", IntVector.class);
+  }
+
+  private void assertEqualsForField(
+      VectorSchemaRoot root, Set<String> columnSet, String columnName, Class<?> expected) {
+    if (columnSet.contains(columnName)) {
+      assertEquals(expected, root.getVector(columnName).getClass());
+    }
+  }
+
+  private void checkAllVectorValues(
+      int expectedNumRows,
+      List<GenericRecord> expectedRows,
+      VectorSchemaRoot root,
+      Set<String> columnSet) {
+    assertEquals(expectedNumRows, root.getRowCount());
+
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "timestamp",
+        (records, i) -> records.get(i).getField("timestamp"),
+        (vector, i) -> timestampFromMicros(((TimeStampMicroVector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "timestamp_nullable",
+        (records, i) -> records.get(i).getField("timestamp_nullable"),
+        (vector, i) -> timestampFromMicros(((TimeStampMicroVector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "boolean",
+        (records, i) -> records.get(i).getField("boolean"),
+        (vector, i) -> ((BitVector) vector).get(i) == 1
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "boolean_nullable",
+        (records, i) -> records.get(i).getField("boolean_nullable"),
+        (vector, i) -> ((BitVector) vector).get(i) == 1
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "int",
+        (records, i) -> records.get(i).getField("int"),
+        (vector, i) -> ((IntVector) vector).get(i)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "int_nullable",
+        (records, i) -> records.get(i).getField("int_nullable"),
+        (vector, i) -> ((IntVector) vector).get(i)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "long",
+        (records, i) -> records.get(i).getField("long"),
+        (vector, i) -> ((BigIntVector) vector).get(i)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "long_nullable",
+        (records, i) -> records.get(i).getField("long_nullable"),
+        (vector, i) -> ((BigIntVector) vector).get(i)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "float",
+        (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float")),
+        (vector, i) -> Float.floatToIntBits(((Float4Vector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "float_nullable",
+        (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float_nullable")),
+        (vector, i) -> Float.floatToIntBits(((Float4Vector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "double",
+        (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double")),
+        (vector, i) -> Double.doubleToLongBits(((Float8Vector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "double_nullable",
+        (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double_nullable")),
+        (vector, i) -> Double.doubleToLongBits(((Float8Vector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "timestamp_tz",
+        (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz")),
+        (vector, i) -> ((TimeStampMicroTZVector) vector).get(i)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "timestamp_tz_nullable",
+        (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz_nullable")),
+        (vector, i) -> ((TimeStampMicroTZVector) vector).get(i)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "string",
+        (records, i) -> records.get(i).getField("string"),
+        (vector, i) -> new String(((VarCharVector) vector).get(i), StandardCharsets.UTF_8)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "string_nullable",
+        (records, i) -> records.get(i).getField("string_nullable"),
+        (vector, i) -> new String(((VarCharVector) vector).get(i), StandardCharsets.UTF_8)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "bytes",
+        (records, i) -> records.get(i).getField("bytes"),
+        (vector, i) -> ByteBuffer.wrap(((VarBinaryVector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "bytes_nullable",
+        (records, i) -> records.get(i).getField("bytes_nullable"),
+        (vector, i) -> ByteBuffer.wrap(((VarBinaryVector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "date",
+        (records, i) -> records.get(i).getField("date"),
+        (vector, i) -> dateFromDay(((DateDayVector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "date_nullable",
+        (records, i) -> records.get(i).getField("date_nullable"),
+        (vector, i) -> dateFromDay(((DateDayVector) vector).get(i))
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "int_promotion",
+        (records, i) -> records.get(i).getField("int_promotion"),
+        (vector, i) -> ((IntVector) vector).get(i)
+    );
+  }
+
+  private static void checkVectorValues(
+      int expectedNumRows,
+      List<GenericRecord> expectedRows,
+      VectorSchemaRoot root,
+      Set<String> columnSet,
+      String columnName,
+      BiFunction<List<GenericRecord>, Integer, Object> expectedValueExtractor,
+      BiFunction<FieldVector, Integer, Object> vectorValueExtractor) {
+    if (columnSet.contains(columnName)) {
+      FieldVector vector = root.getVector(columnName);
+      assertEquals(expectedNumRows, vector.getValueCount());
+      for (int i = 0; i < expectedNumRows; i++) {
+        Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
+        Object actualValue = vectorValueExtractor.apply(vector, i);
+        assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+      }
+    }
+  }
+
+  private static final class LocalDateTimeToLongMicros implements StructLike {
+
+    private final Record row;
+
+    LocalDateTimeToLongMicros(Record row) {
+      this.row = row;
+    }
+
+    @Override
+    public int size() {
+      return row.size();
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      Object value = row.get(pos);
+      if (value instanceof LocalDateTime) {
+        @SuppressWarnings("unchecked")
+        T result = (T) (Long) timestampToMicros((LocalDateTime) value);
+        return result;
+      } else if (value instanceof OffsetDateTime) {
+        @SuppressWarnings("unchecked")
+        T result = (T) (Long) timestampToMicros(((OffsetDateTime) value).toLocalDateTime());
+        return result;
+      } else if (value != null) {
+        throw new IllegalArgumentException("Unsupported value type: " + value.getClass());
+      } else {
+        throw new IllegalArgumentException("Don't know how to handle null value");
+      }
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      row.set(pos, value);
+    }
+  }
+}
diff --git a/build.gradle b/build.gradle
index 5dc6f1a..4f74ee5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -737,9 +737,17 @@ project(':iceberg-arrow') {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
     compile("org.apache.arrow:arrow-memory-netty") {
-      exclude group: 'io.netty', module: 'netty-common'
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
+      exclude group: 'io.netty', module: 'netty-common'
     }
+
+    testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
+    // To run ArrowReaderTest test cases, :netty-common is needed.
+    // We import :netty-common through :arrow-memory-netty
+    // so that the same version as used by the :arrow-memory-netty module is picked.
+    testCompile("org.apache.arrow:arrow-memory-netty")
+    testCompile("org.apache.hadoop:hadoop-common")
+    testCompile("org.apache.hadoop:hadoop-mapreduce-client-core")
   }
 }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
new file mode 100644
index 0000000..c63ce1e
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.math.BigDecimal;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+
+final class ArrowVectorAccessorFactory
+    extends GenericArrowVectorAccessorFactory<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> {
+
+  ArrowVectorAccessorFactory() {
+    super(DecimalFactoryImpl::new,
+        StringFactoryImpl::new,
+        StructChildFactoryImpl::new,
+        ArrayFactoryImpl::new);
+  }
+
+  private static final class DecimalFactoryImpl implements DecimalFactory<Decimal> {
+    @Override
+    public Class<Decimal> getGenericClass() {
+      return Decimal.class;
+    }
+
+    @Override
+    public Decimal ofLong(long value, int precision, int scale) {
+      return Decimal.apply(value, precision, scale);
+    }
+
+    @Override
+    public Decimal ofBigDecimal(BigDecimal value, int precision, int scale) {
+      return Decimal.apply(value, precision, scale);
+    }
+  }
+
+  private static final class StringFactoryImpl implements StringFactory<UTF8String> {
+    @Override
+    public Class<UTF8String> getGenericClass() {
+      return UTF8String.class;
+    }
+
+    @Override
+    public UTF8String ofRow(VarCharVector vector, int rowId) {
+      int start = vector.getStartOffset(rowId);
+      int end = vector.getEndOffset(rowId);
+
+      return UTF8String.fromAddress(
+          null,
+          vector.getDataBuffer().memoryAddress() + start,
+          end - start);
+    }
+
+    @Override
+    public UTF8String ofBytes(byte[] bytes) {
+      return UTF8String.fromBytes(bytes);
+    }
+  }
+
+  private static final class ArrayFactoryImpl implements ArrayFactory<ArrowColumnVector, ColumnarArray> {
+    @Override
+    public ArrowColumnVector ofChild(ValueVector childVector) {
+      return new ArrowColumnVector(childVector);
+    }
+
+    @Override
+    public ColumnarArray ofRow(ValueVector vector, ArrowColumnVector childData, int rowId) {
+      ArrowBuf offsets = vector.getOffsetBuffer();
+      int index = rowId * ListVector.OFFSET_WIDTH;
+      int start = offsets.getInt(index);
+      int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
+      return new ColumnarArray(childData, start, end - start);
+    }
+  }
+
+  private static final class StructChildFactoryImpl implements StructChildFactory<ArrowColumnVector> {
+    @Override
+    public Class<ArrowColumnVector> getGenericClass() {
+      return ArrowColumnVector.class;
+    }
+
+    @Override
+    public ArrowColumnVector of(ValueVector childVector) {
+      return new ArrowColumnVector(childVector);
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
index fb5f443..f3b3377 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
@@ -19,506 +19,22 @@
 
 package org.apache.iceberg.spark.data.vectorized;
 
-import java.math.BigInteger;
-import java.util.stream.IntStream;
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.complex.StructVector;
-import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor;
 import org.apache.iceberg.arrow.vectorized.VectorHolder;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.PrimitiveType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.vectorized.ArrowColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.unsafe.types.UTF8String;
-import org.jetbrains.annotations.NotNull;
 
 public class ArrowVectorAccessors {
 
-  private ArrowVectorAccessors() {
-  }
-
-  static ArrowVectorAccessor getVectorAccessor(VectorHolder holder) {
-    Dictionary dictionary = holder.dictionary();
-    boolean isVectorDictEncoded = holder.isDictionaryEncoded();
-    FieldVector vector = holder.vector();
-    if (isVectorDictEncoded) {
-      ColumnDescriptor desc = holder.descriptor();
-      PrimitiveType primitive = desc.getPrimitiveType();
-      return getDictionaryVectorAccessor(dictionary, desc, vector, primitive);
-    } else {
-      return getPlainVectorAccessor(vector);
-    }
-  }
-
-  @NotNull
-  private static ArrowVectorAccessor getDictionaryVectorAccessor(
-      Dictionary dictionary,
-      ColumnDescriptor desc,
-      FieldVector vector, PrimitiveType primitive) {
-    Preconditions.checkState(vector instanceof IntVector, "Dictionary ids should be stored in IntVectors only");
-    if (primitive.getOriginalType() != null) {
-      switch (desc.getPrimitiveType().getOriginalType()) {
-        case ENUM:
-        case JSON:
-        case UTF8:
-        case BSON:
-          return new DictionaryStringAccessor((IntVector) vector, dictionary);
-        case INT_64:
-        case TIMESTAMP_MILLIS:
-        case TIMESTAMP_MICROS:
-          return new DictionaryLongAccessor((IntVector) vector, dictionary);
-        case DECIMAL:
-          switch (primitive.getPrimitiveTypeName()) {
-            case BINARY:
-            case FIXED_LEN_BYTE_ARRAY:
-              return new DictionaryDecimalBinaryAccessor(
-                  (IntVector) vector,
-                  dictionary);
-            case INT64:
-              return new DictionaryDecimalLongAccessor(
-                  (IntVector) vector,
-                  dictionary);
-            case INT32:
-              return new DictionaryDecimalIntAccessor(
-                  (IntVector) vector,
-                  dictionary);
-            default:
-              throw new UnsupportedOperationException(
-                  "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
-          }
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported logical type: " + primitive.getOriginalType());
-      }
-    } else {
-      switch (primitive.getPrimitiveTypeName()) {
-        case FIXED_LEN_BYTE_ARRAY:
-        case BINARY:
-          return new DictionaryBinaryAccessor((IntVector) vector, dictionary);
-        case FLOAT:
-          return new DictionaryFloatAccessor((IntVector) vector, dictionary);
-        case INT64:
-          return new DictionaryLongAccessor((IntVector) vector, dictionary);
-        case DOUBLE:
-          return new DictionaryDoubleAccessor((IntVector) vector, dictionary);
-        default:
-          throw new UnsupportedOperationException("Unsupported type: " + primitive);
-      }
-    }
-  }
-
-  @NotNull
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  private static ArrowVectorAccessor getPlainVectorAccessor(FieldVector vector) {
-    if (vector instanceof BitVector) {
-      return new BooleanAccessor((BitVector) vector);
-    } else if (vector instanceof IntVector) {
-      return new IntAccessor((IntVector) vector);
-    } else if (vector instanceof BigIntVector) {
-      return new LongAccessor((BigIntVector) vector);
-    } else if (vector instanceof Float4Vector) {
-      return new FloatAccessor((Float4Vector) vector);
-    } else if (vector instanceof Float8Vector) {
-      return new DoubleAccessor((Float8Vector) vector);
-    } else if (vector instanceof DecimalVector) {
-      return new DecimalAccessor((DecimalVector) vector);
-    } else if (vector instanceof VarCharVector) {
-      return new StringAccessor((VarCharVector) vector);
-    } else if (vector instanceof VarBinaryVector) {
-      return new BinaryAccessor((VarBinaryVector) vector);
-    } else if (vector instanceof DateDayVector) {
-      return new DateAccessor((DateDayVector) vector);
-    } else if (vector instanceof TimeStampMicroTZVector) {
-      return new TimestampAccessor((TimeStampMicroTZVector) vector);
-    } else if (vector instanceof ListVector) {
-      ListVector listVector = (ListVector) vector;
-      return new ArrayAccessor(listVector);
-    } else if (vector instanceof StructVector) {
-      StructVector structVector = (StructVector) vector;
-      return new StructAccessor(structVector);
-    }
-    throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
-  }
-
-  private static class BooleanAccessor extends ArrowVectorAccessor {
-
-    private final BitVector vector;
-
-    BooleanAccessor(BitVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final boolean getBoolean(int rowId) {
-      return vector.get(rowId) == 1;
-    }
-  }
-
-  private static class IntAccessor extends ArrowVectorAccessor {
-
-    private final IntVector vector;
-
-    IntAccessor(IntVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final int getInt(int rowId) {
-      return vector.get(rowId);
-    }
-
-    @Override
-    final long getLong(int rowId) {
-      return getInt(rowId);
-    }
-  }
-
-  private static class LongAccessor extends ArrowVectorAccessor {
-
-    private final BigIntVector vector;
-
-    LongAccessor(BigIntVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final long getLong(int rowId) {
-      return vector.get(rowId);
-    }
-  }
-
-  private static class DictionaryLongAccessor extends ArrowVectorAccessor {
-    private final IntVector offsetVector;
-    private final long[] decodedDictionary;
-
-    DictionaryLongAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector);
-      this.offsetVector = vector;
-      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
-          .mapToLong(dictionary::decodeToLong)
-          .toArray();
-    }
+  private static final ArrowVectorAccessorFactory factory = new ArrowVectorAccessorFactory();
 
-    @Override
-    final long getLong(int rowId) {
-      return decodedDictionary[offsetVector.get(rowId)];
-    }
+  static ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector>
+      getVectorAccessor(VectorHolder holder) {
+    return factory.getVectorAccessor(holder);
   }
 
-  private static class FloatAccessor extends ArrowVectorAccessor {
-
-    private final Float4Vector vector;
-
-    FloatAccessor(Float4Vector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final float getFloat(int rowId) {
-      return vector.get(rowId);
-    }
-
-    @Override
-    final double getDouble(int rowId) {
-      return getFloat(rowId);
-    }
-  }
-
-  private static class DictionaryFloatAccessor extends ArrowVectorAccessor {
-    private final IntVector offsetVector;
-    private final float[] decodedDictionary;
-
-    DictionaryFloatAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector);
-      this.offsetVector = vector;
-      this.decodedDictionary = new float[dictionary.getMaxId() + 1];
-      for (int i = 0; i <= dictionary.getMaxId(); i++) {
-        decodedDictionary[i] = dictionary.decodeToFloat(i);
-      }
-    }
-
-    @Override
-    final float getFloat(int rowId) {
-      return decodedDictionary[offsetVector.get(rowId)];
-    }
-
-    @Override
-    final double getDouble(int rowId) {
-      return getFloat(rowId);
-    }
-  }
-
-  private static class DoubleAccessor extends ArrowVectorAccessor {
-
-    private final Float8Vector vector;
-
-    DoubleAccessor(Float8Vector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final double getDouble(int rowId) {
-      return vector.get(rowId);
-    }
-  }
-
-  private static class DictionaryDoubleAccessor extends ArrowVectorAccessor {
-    private final IntVector offsetVector;
-    private final double[] decodedDictionary;
-
-    DictionaryDoubleAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector);
-      this.offsetVector = vector;
-      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
-          .mapToDouble(dictionary::decodeToDouble)
-          .toArray();
-    }
-
-    @Override
-    final double getDouble(int rowId) {
-      return decodedDictionary[offsetVector.get(rowId)];
-    }
-  }
-
-  private static class StringAccessor extends ArrowVectorAccessor {
-
-    private final VarCharVector vector;
-
-    StringAccessor(VarCharVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final UTF8String getUTF8String(int rowId) {
-      int start = vector.getStartOffset(rowId);
-      int end = vector.getEndOffset(rowId);
-
-      return UTF8String.fromAddress(
-          null,
-          vector.getDataBuffer().memoryAddress() + start,
-          end - start);
-    }
-  }
-
-  private static class DictionaryStringAccessor extends ArrowVectorAccessor {
-    private final UTF8String[] decodedDictionary;
-    private final IntVector offsetVector;
-
-    DictionaryStringAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector);
-      this.offsetVector = vector;
-      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
-          .mapToObj(dictionary::decodeToBinary)
-          .map(binary -> UTF8String.fromBytes(binary.getBytes()))
-          .toArray(UTF8String[]::new);
-    }
-
-    @Override
-    final UTF8String getUTF8String(int rowId) {
-      int offset = offsetVector.get(rowId);
-      return decodedDictionary[offset];
-    }
-  }
-
-  private static class BinaryAccessor extends ArrowVectorAccessor {
-
-    private final VarBinaryVector vector;
-
-    BinaryAccessor(VarBinaryVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final byte[] getBinary(int rowId) {
-      return vector.get(rowId);
-    }
-  }
-
-  private static class DictionaryBinaryAccessor extends ArrowVectorAccessor {
-    private final IntVector offsetVector;
-    private final byte[][] decodedDictionary;
-
-    DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector);
-      this.offsetVector = vector;
-      this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
-          .mapToObj(dictionary::decodeToBinary)
-          .map(Binary::getBytes)
-          .toArray(byte[][]::new);
-    }
-
-    @Override
-    final byte[] getBinary(int rowId) {
-      int offset = offsetVector.get(rowId);
-      return decodedDictionary[offset];
-    }
-  }
-
-  private static class DateAccessor extends ArrowVectorAccessor {
-
-    private final DateDayVector vector;
-
-    DateAccessor(DateDayVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final int getInt(int rowId) {
-      return vector.get(rowId);
-    }
-  }
-
-  private static class TimestampAccessor extends ArrowVectorAccessor {
-
-    private final TimeStampMicroTZVector vector;
-
-    TimestampAccessor(TimeStampMicroTZVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final long getLong(int rowId) {
-      return vector.get(rowId);
-    }
-  }
-
-  private static class ArrayAccessor extends ArrowVectorAccessor {
-
-    private final ListVector vector;
-    private final ArrowColumnVector arrayData;
-
-    ArrayAccessor(ListVector vector) {
-      super(vector);
-      this.vector = vector;
-      this.arrayData = new ArrowColumnVector(vector.getDataVector());
-    }
-
-    @Override
-    final ColumnarArray getArray(int rowId) {
-      ArrowBuf offsets = vector.getOffsetBuffer();
-      int index = rowId * ListVector.OFFSET_WIDTH;
-      int start = offsets.getInt(index);
-      int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
-      return new ColumnarArray(arrayData, start, end - start);
-    }
-  }
-
-  /**
-   * Use {@link IcebergArrowColumnVector#getChild(int)} to get hold of the {@link ArrowColumnVector} vectors holding the
-   * struct values.
-   */
-  private static class StructAccessor extends ArrowVectorAccessor {
-    StructAccessor(StructVector structVector) {
-      super(structVector, IntStream.range(0, structVector.size())
-          .mapToObj(structVector::getVectorById)
-          .map(ArrowColumnVector::new)
-          .toArray(ArrowColumnVector[]::new));
-    }
-  }
-
-  private static class DecimalAccessor extends ArrowVectorAccessor {
-
-    private final DecimalVector vector;
-
-    DecimalAccessor(DecimalVector vector) {
-      super(vector);
-      this.vector = vector;
-    }
-
-    @Override
-    final Decimal getDecimal(int rowId, int precision, int scale) {
-      return Decimal.apply(DecimalUtility.getBigDecimalFromArrowBuf(vector.getDataBuffer(), rowId, scale),
-          precision, scale);
-    }
-  }
-
-  @SuppressWarnings("checkstyle:VisibilityModifier")
-  private abstract static class DictionaryDecimalAccessor extends ArrowVectorAccessor {
-    final Decimal[] cache;
-    Dictionary parquetDictionary;
-    final IntVector offsetVector;
-
-    private DictionaryDecimalAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector);
-      this.offsetVector = vector;
-      this.parquetDictionary = dictionary;
-      this.cache = new Decimal[dictionary.getMaxId() + 1];
-    }
-  }
-
-  private static class DictionaryDecimalBinaryAccessor extends DictionaryDecimalAccessor {
-
-    DictionaryDecimalBinaryAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector, dictionary);
-    }
-
-    @Override
-    final Decimal getDecimal(int rowId, int precision, int scale) {
-      int dictId = offsetVector.get(rowId);
-      if (cache[dictId] == null) {
-        cache[dictId] = Decimal.apply(
-            new BigInteger(parquetDictionary.decodeToBinary(dictId).getBytes()).longValue(),
-            precision,
-            scale);
-      }
-      return cache[dictId];
-    }
-  }
-
-  private static class DictionaryDecimalLongAccessor extends DictionaryDecimalAccessor {
-
-    DictionaryDecimalLongAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector, dictionary);
-    }
-
-    @Override
-    final Decimal getDecimal(int rowId, int precision, int scale) {
-      int dictId = offsetVector.get(rowId);
-      if (cache[dictId] == null) {
-        cache[dictId] = Decimal.apply(parquetDictionary.decodeToLong(dictId), precision, scale);
-      }
-      return cache[dictId];
-    }
-  }
-
-  private static class DictionaryDecimalIntAccessor extends DictionaryDecimalAccessor {
-
-    DictionaryDecimalIntAccessor(IntVector vector, Dictionary dictionary) {
-      super(vector, dictionary);
-    }
-
-    @Override
-    final Decimal getDecimal(int rowId, int precision, int scale) {
-      int dictId = offsetVector.get(rowId);
-      if (cache[dictId] == null) {
-        cache[dictId] = Decimal.apply(parquetDictionary.decodeToInt(dictId), precision, scale);
-      }
-      return cache[dictId];
-    }
+  private ArrowVectorAccessors() {
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
index 9b973f4..514eec8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.spark.data.vectorized;
 
+import org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor;
 import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
 import org.apache.iceberg.arrow.vectorized.VectorHolder;
 import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
@@ -38,7 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  */
 public class IcebergArrowColumnVector extends ColumnVector {
 
-  private final ArrowVectorAccessor accessor;
+  private final ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> accessor;
   private final NullabilityHolder nullabilityHolder;
 
   public IcebergArrowColumnVector(VectorHolder holder) {
@@ -150,7 +151,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
         new IcebergArrowColumnVector(holder);
   }
 
-  public ArrowVectorAccessor vectorAccessor() {
+  public ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> vectorAccessor() {
     return accessor;
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 818f405..b2d5823 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -19,26 +19,12 @@
 
 package org.apache.iceberg.spark.data.vectorized;
 
-import java.util.List;
 import java.util.Map;
-import java.util.stream.IntStream;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.arrow.ArrowAllocation;
-import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
-import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.ConstantVectorReader;
+import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.parquet.VectorizedReader;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
 
 public class VectorizedSparkParquetReaders {
 
@@ -59,93 +45,8 @@ public class VectorizedSparkParquetReaders {
       Map<Integer, ?> idToConstant) {
     return (ColumnarBatchReader)
         TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-            new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector, idToConstant));
-  }
-
-  private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
-    private final MessageType parquetSchema;
-    private final Schema icebergSchema;
-    private final BufferAllocator rootAllocator;
-    private final Map<Integer, ?> idToConstant;
-    private final boolean setArrowValidityVector;
-
-    VectorizedReaderBuilder(
-        Schema expectedSchema,
-        MessageType parquetSchema,
-        boolean setArrowValidityVector, Map<Integer, ?> idToConstant) {
-      this.parquetSchema = parquetSchema;
-      this.icebergSchema = expectedSchema;
-      this.rootAllocator = ArrowAllocation.rootAllocator()
-          .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
-      this.setArrowValidityVector = setArrowValidityVector;
-      this.idToConstant = idToConstant;
-    }
-
-    @Override
-    public VectorizedReader<?> message(
-            Types.StructType expected, MessageType message,
-            List<VectorizedReader<?>> fieldReaders) {
-      GroupType groupType = message.asGroupType();
-      Map<Integer, VectorizedReader<?>> readersById = Maps.newHashMap();
-      List<Type> fields = groupType.getFields();
-
-      IntStream.range(0, fields.size())
-          .filter(pos -> fields.get(pos).getId() != null)
-          .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos)));
-
-      List<Types.NestedField> icebergFields = expected != null ?
-          expected.fields() : ImmutableList.of();
-
-      List<VectorizedReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
-          icebergFields.size());
-
-      for (Types.NestedField field : icebergFields) {
-        int id = field.fieldId();
-        VectorizedReader<?> reader = readersById.get(id);
-        if (idToConstant.containsKey(id)) {
-          reorderedFields.add(new ConstantVectorReader<>(idToConstant.get(id)));
-        } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
-          reorderedFields.add(VectorizedArrowReader.positions());
-        } else if (reader != null) {
-          reorderedFields.add(reader);
-        } else {
-          reorderedFields.add(VectorizedArrowReader.nulls());
-        }
-      }
-      return new ColumnarBatchReader(reorderedFields);
-    }
-
-    @Override
-    public VectorizedReader<?> struct(
-        Types.StructType expected, GroupType groupType,
-        List<VectorizedReader<?>> fieldReaders) {
-      if (expected != null) {
-        throw new UnsupportedOperationException("Vectorized reads are not supported yet for struct fields");
-      }
-      return null;
-    }
-
-    @Override
-    public VectorizedReader<?> primitive(
-        org.apache.iceberg.types.Type.PrimitiveType expected,
-        PrimitiveType primitive) {
-
-      // Create arrow vector for this field
-      if (primitive.getId() == null) {
-        return null;
-      }
-      int parquetFieldId = primitive.getId().intValue();
-      ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
-      // Nested types not yet supported for vectorized reads
-      if (desc.getMaxRepetitionLevel() > 0) {
-        return null;
-      }
-      Types.NestedField icebergField = icebergSchema.findField(parquetFieldId);
-      if (icebergField == null) {
-        return null;
-      }
-      // Set the validity buffer if null checking is enabled in arrow
-      return new VectorizedArrowReader(desc, icebergField, rootAllocator, setArrowValidityVector);
-    }
+            new VectorizedReaderBuilder(
+                expectedSchema, fileSchema, setArrowValidityVector,
+                idToConstant, ColumnarBatchReader::new));
   }
 }