You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/22 00:36:48 UTC

[incubator-iceberg] branch vectorized-read updated: Fix various checkstyle issues (#667)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/vectorized-read by this push:
     new ecc9c8d  Fix various checkstyle issues (#667)
ecc9c8d is described below

commit ecc9c8dccd0d1f0c86fa8b1814b168ed6a480050
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Thu Nov 21 16:36:38 2019 -0800

    Fix various checkstyle issues (#667)
---
 build.gradle                                       |   10 +-
 gradle/wrapper/gradle-wrapper.properties           |   24 +-
 .../org/apache/iceberg/parquet/ParquetUtil.java    |  270 ++-
 .../{BytesReader.java => ValuesAsBytesReader.java} |   48 +-
 .../iceberg/parquet/arrow/ArrowSchemaUtil.java     |  147 ++
 .../arrow/IcebergArrowColumnVector.java            |  100 +-
 .../arrow/IcebergDecimalArrowVector.java           |    8 +-
 .../arrow/IcebergVarBinaryArrowVector.java         |    6 +-
 .../arrow/IcebergVarcharArrowVector.java           |    8 +-
 .../parquet/vectorized/ColumnarBatchReaders.java   |  112 +-
 .../parquet/vectorized/NullabilityHolder.java      |   16 +-
 .../iceberg/parquet/vectorized/VectorHolder.java   |   61 +-
 .../parquet/vectorized/VectorizedArrowReader.java  |   56 +-
 .../vectorized/VectorizedColumnIterator.java       |   54 +-
 .../parquet/vectorized/VectorizedPageIterator.java |  856 +++----
 .../vectorized/VectorizedParquetValuesReader.java  | 2334 +++++++++++---------
 .../org/apache/iceberg/spark/data/TestHelpers.java |   11 +-
 versions.props                                     |    1 +
 18 files changed, 2275 insertions(+), 1847 deletions(-)

diff --git a/build.gradle b/build.gradle
index 8556cf3..9071a61 100644
--- a/build.gradle
+++ b/build.gradle
@@ -252,6 +252,12 @@ project(':iceberg-core') {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
+    compile("org.apache.arrow:arrow-vector") {
+      exclude group: 'io.netty', module: 'netty-buffer'
+      exclude group: 'io.netty', module: 'netty-common'
+    }
+
+    testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
   }
 }
 
@@ -264,7 +270,6 @@ project(':iceberg-data') {
     compileOnly("org.apache.spark:spark-hive_2.11") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
-
     testCompile("org.apache.hadoop:hadoop-client") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
@@ -346,6 +351,9 @@ project(':iceberg-parquet') {
     compileOnly("org.apache.hadoop:hadoop-client") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
+    compileOnly("org.apache.spark:spark-hive_2.11") {
+      exclude group: 'org.apache.avro', module: 'avro'
+    }
 
     testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
   }
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 1277f7d..768a708 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,24 +1,6 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
+#Wed Nov 20 14:10:01 PST 2019
+distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip
+zipStoreBase=GRADLE_USER_HOME
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 4cab3d9..dbfeb41 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -22,6 +22,15 @@ package org.apache.iceberg.parquet;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.MetricsModes;
@@ -33,8 +42,9 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.UnicodeUtil;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.statistics.Statistics;
@@ -47,18 +57,6 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import static org.apache.iceberg.parquet.ParquetConversions.fromParquetPrimitive;
-import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMax;
-import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMin;
-import static org.apache.iceberg.util.UnicodeUtil.truncateStringMax;
-import static org.apache.iceberg.util.UnicodeUtil.truncateStringMin;
-import static org.apache.parquet.schema.OriginalType.*;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
-
 public class ParquetUtil {
   // not meant to be instantiated
   private ParquetUtil() {
@@ -98,7 +96,7 @@ public class ParquetUtil {
         increment(columnSizes, fieldId, column.getTotalSize());
 
         String columnName = fileSchema.findColumnName(fieldId);
-        MetricsMode metricsMode = metricsConfig.columnMode(columnName);
+        MetricsModes.MetricsMode metricsMode = metricsConfig.columnMode(columnName);
         if (metricsMode == MetricsModes.None.get()) {
           continue;
         }
@@ -113,9 +111,9 @@ public class ParquetUtil {
           if (metricsMode != MetricsModes.Counts.get()) {
             Types.NestedField field = fileSchema.findField(fieldId);
             if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) {
-              Literal<?> min = fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin());
+              Literal<?> min = ParquetConversions.fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin());
               updateMin(lowerBounds, fieldId, field.type(), min, metricsMode);
-              Literal<?> max = fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax());
+              Literal<?> max = ParquetConversions.fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax());
               updateMax(upperBounds, fieldId, field.type(), max, metricsMode);
             }
           }
@@ -134,10 +132,8 @@ public class ParquetUtil {
         toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
   }
 
-
   /**
-   * @return a list of offsets in ascending order determined by the starting position
-   * of the row groups
+   * @return a list of offsets in ascending order determined by the starting position of the row groups
    */
   public static List<Long> getSplitOffsets(ParquetMetadata md) {
     List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
@@ -175,8 +171,9 @@ public class ParquetUtil {
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> void updateMin(Map<Integer, Literal<?>> lowerBounds, int id, Type type,
-                                    Literal<T> min, MetricsMode metricsMode) {
+  private static <T> void updateMin(
+      Map<Integer, Literal<?>> lowerBounds, int id, Type type,
+      Literal<T> min, MetricsMode metricsMode) {
     Literal<T> currentMin = (Literal<T>) lowerBounds.get(id);
     if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) {
       if (metricsMode == MetricsModes.Full.get()) {
@@ -186,11 +183,11 @@ public class ParquetUtil {
         int truncateLength = truncateMode.length();
         switch (type.typeId()) {
           case STRING:
-            lowerBounds.put(id, truncateStringMin((Literal<CharSequence>) min, truncateLength));
+            lowerBounds.put(id, UnicodeUtil.truncateStringMin((Literal<CharSequence>) min, truncateLength));
             break;
           case FIXED:
           case BINARY:
-            lowerBounds.put(id, truncateBinaryMin((Literal<ByteBuffer>) min, truncateLength));
+            lowerBounds.put(id, BinaryUtil.truncateBinaryMin((Literal<ByteBuffer>) min, truncateLength));
             break;
           default:
             lowerBounds.put(id, min);
@@ -200,8 +197,9 @@ public class ParquetUtil {
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> void updateMax(Map<Integer, Literal<?>> upperBounds, int id, Type type,
-                                    Literal<T> max, MetricsMode metricsMode) {
+  private static <T> void updateMax(
+      Map<Integer, Literal<?>> upperBounds, int id, Type type,
+      Literal<T> max, MetricsMode metricsMode) {
     Literal<T> currentMax = (Literal<T>) upperBounds.get(id);
     if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) {
       if (metricsMode == MetricsModes.Full.get()) {
@@ -211,11 +209,11 @@ public class ParquetUtil {
         int truncateLength = truncateMode.length();
         switch (type.typeId()) {
           case STRING:
-            upperBounds.put(id, truncateStringMax((Literal<CharSequence>) max, truncateLength));
+            upperBounds.put(id, UnicodeUtil.truncateStringMax((Literal<CharSequence>) max, truncateLength));
             break;
           case FIXED:
           case BINARY:
-            upperBounds.put(id, truncateBinaryMax((Literal<ByteBuffer>) max, truncateLength));
+            upperBounds.put(id, BinaryUtil.truncateBinaryMax((Literal<ByteBuffer>) max, truncateLength));
             break;
           default:
             upperBounds.put(id, max);
@@ -227,148 +225,132 @@ public class ParquetUtil {
   private static Map<Integer, ByteBuffer> toBufferMap(Schema schema, Map<Integer, Literal<?>> map) {
     Map<Integer, ByteBuffer> bufferMap = Maps.newHashMap();
     for (Map.Entry<Integer, Literal<?>> entry : map.entrySet()) {
-      bufferMap.put(entry.getKey(),
+      bufferMap.put(
+          entry.getKey(),
           Conversions.toByteBuffer(schema.findType(entry.getKey()), entry.getValue().value()));
     }
     return bufferMap;
   }
 
-  public static boolean isFixedLengthDecimal(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    return (primitive.getOriginalType() != null &&
-            primitive.getOriginalType() == DECIMAL &&
-            (primitive.getPrimitiveTypeName() == FIXED_LEN_BYTE_ARRAY ||
-                    primitive.getPrimitiveTypeName() == BINARY));
-  }
-
-  public static boolean isIntLongBackedDecimal(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    return (primitive.getOriginalType() != null &&
-            primitive.getOriginalType() == DECIMAL &&
-            (primitive.getPrimitiveTypeName() == INT64 ||
-                    primitive.getPrimitiveTypeName() == INT32));
-  }
-
-  public static boolean isVarWidthType(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    if (originalType != null &&
-            originalType != DECIMAL &&
-            (originalType == ENUM ||
-                    originalType == OriginalType.JSON ||
-                    originalType == UTF8 ||
-                    originalType == BSON)) {
-      return true;
+    public static boolean isFixedLengthDecimal(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        return primitive.getOriginalType() != null &&
+                primitive.getOriginalType() == OriginalType.DECIMAL &&
+                (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ||
+                        primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY);
     }
-    if (originalType == null && primitive.getPrimitiveTypeName() == BINARY) {
-      return true;
-    }
-    return false;
-  }
 
-  public static boolean isBooleanType(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    return (originalType == null && primitive.getPrimitiveTypeName() == BOOLEAN);
-  }
+    public static boolean isIntLongBackedDecimal(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        return primitive.getOriginalType() != null &&
+                primitive.getOriginalType() == OriginalType.DECIMAL &&
+                (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64 ||
+                        primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32);
+    }
 
-  public static boolean isFixedWidthBinary(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    if (originalType == null && primitive.getPrimitiveTypeName() == FIXED_LEN_BYTE_ARRAY) {
-      return true;
+    public static boolean isVarWidthType(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        OriginalType originalType = primitive.getOriginalType();
+        if (originalType != null &&
+                originalType != OriginalType.DECIMAL &&
+                (originalType == OriginalType.ENUM ||
+                        originalType == OriginalType.JSON ||
+                        originalType == OriginalType.UTF8 ||
+                        originalType == OriginalType.BSON)) {
+            return true;
+        }
+        if (originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) {
+            return true;
+        }
+        return false;
     }
-    return false;
-  }
 
-  public static boolean isNumericNonDecimalType(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    if (originalType != null) {
-      if (originalType == INT_8 || originalType == INT_16 || originalType == INT_32
-          || originalType == DATE || originalType == INT_64 || originalType == TIMESTAMP_MILLIS
-          || originalType == TIMESTAMP_MICROS) {
-        return true;
-      }
-    } else {
-      PrimitiveType.PrimitiveTypeName primitiveTypeName = primitive.getPrimitiveTypeName();
-      if (primitiveTypeName == INT64 || primitiveTypeName == INT32 || primitiveTypeName == FLOAT ||
-          primitiveTypeName == DOUBLE) {
-        return true;
-      }
+    public static boolean isBooleanType(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        OriginalType originalType = primitive.getOriginalType();
+        return originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BOOLEAN;
     }
-    return false;
-  }
 
-  public static boolean isIntType(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    if (originalType != null && (originalType ==  INT_8 || originalType == INT_16 || originalType == INT_32 || originalType == DATE)) {
-      return true;
-    } else if (primitive.getPrimitiveTypeName() == INT32) {
-      return true;
+    public static boolean isFixedWidthBinary(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        OriginalType originalType = primitive.getOriginalType();
+        if (originalType == null &&
+                primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+            return true;
+        }
+        return false;
     }
-    return false;
-  }
 
-  public static boolean isLongType(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    if (originalType != null && (originalType ==  INT_64 || originalType == TIMESTAMP_MILLIS || originalType == TIMESTAMP_MICROS)) {
-      return true;
-    } else if (primitive.getPrimitiveTypeName() == INT64) {
-      return true;
+    public static boolean isIntType(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        OriginalType originalType = primitive.getOriginalType();
+        if (originalType != null && (originalType == OriginalType.INT_8 || originalType == OriginalType.INT_16 ||
+                originalType == OriginalType.INT_32 || originalType == OriginalType.DATE)) {
+            return true;
+        } else if (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) {
+            return true;
+        }
+        return false;
     }
-    return false;
-  }
 
-  public static boolean isDoubleType(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    if (originalType == null && primitive.getPrimitiveTypeName() == DOUBLE) {
-      return true;
+    public static boolean isLongType(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        OriginalType originalType = primitive.getOriginalType();
+        if (originalType != null && (originalType == OriginalType.INT_64 || originalType == OriginalType.TIMESTAMP_MILLIS ||
+                originalType == OriginalType.TIMESTAMP_MICROS)) {
+            return true;
+        } else if (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) {
+            return true;
+        }
+        return false;
     }
-    return false;
-  }
 
-  public static boolean isFloatType(ColumnDescriptor desc) {
-    PrimitiveType primitive = desc.getPrimitiveType();
-    OriginalType originalType = primitive.getOriginalType();
-    if (originalType == null && primitive.getPrimitiveTypeName() == FLOAT) {
-      return true;
+    public static boolean isDoubleType(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        OriginalType originalType = primitive.getOriginalType();
+        if (originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
+            return true;
+        }
+        return false;
     }
-    return false;
-  }
 
-  @SuppressWarnings("deprecation")
-  public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
-    EncodingStats stats = meta.getEncodingStats();
-    if (stats != null) {
-      return stats.hasNonDictionaryEncodedPages();
+    public static boolean isFloatType(ColumnDescriptor desc) {
+        PrimitiveType primitive = desc.getPrimitiveType();
+        OriginalType originalType = primitive.getOriginalType();
+        if (originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+            return true;
+        }
+        return false;
     }
 
-    // without EncodingStats, fall back to testing the encoding list
-    Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
-    if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
-      // if remove returned true, PLAIN_DICTIONARY was present, which means at
-      // least one page was dictionary encoded and 1.0 encodings are used
+    @SuppressWarnings("deprecation")
+    public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
+        EncodingStats stats = meta.getEncodingStats();
+        if (stats != null) {
+            return stats.hasNonDictionaryEncodedPages();
+        }
 
-      // RLE and BIT_PACKED are only used for repetition or definition levels
-      encodings.remove(Encoding.RLE);
-      encodings.remove(Encoding.BIT_PACKED);
+        // without EncodingStats, fall back to testing the encoding list
+        Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
+        if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
+            // if remove returned true, PLAIN_DICTIONARY was present, which means at
+            // least one page was dictionary encoded and 1.0 encodings are used
 
-      if (encodings.isEmpty()) {
-        return false; // no encodings other than dictionary or rep/def levels
-      }
+            // RLE and BIT_PACKED are only used for repetition or definition levels
+            encodings.remove(Encoding.RLE);
+            encodings.remove(Encoding.BIT_PACKED);
 
-      return true;
+            if (encodings.isEmpty()) {
+                return false; // no encodings other than dictionary or rep/def levels
+            }
 
-    } else {
-      // if PLAIN_DICTIONARY wasn't present, then either the column is not
-      // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
-      // for 2.0, this cannot determine whether a page fell back without
-      // page encoding stats
-      return true;
+            return true;
+        } else {
+            // if PLAIN_DICTIONARY wasn't present, then either the column is not
+            // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
+            // for 2.0, this cannot determine whether a page fell back without
+            // page encoding stats
+            return true;
+        }
     }
-  }
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
similarity index 50%
rename from parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
index 6bc4756..6100979 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
@@ -1,18 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iceberg.parquet;
@@ -20,27 +22,26 @@ package org.apache.iceberg.parquet;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import org.apache.arrow.vector.BigIntVector;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
 
 /**
- * Implements a {@link ValuesReader} specifically to read given number of bytes from the underlying
- * {@link ByteBufferInputStream}.
+ * Implements a {@link ValuesReader} specifically to read given number of bytes from the underlying {@link
+ * ByteBufferInputStream}.
  */
-public class BytesReader extends ValuesReader {
-  private ByteBufferInputStream in = null;
+public class ValuesAsBytesReader extends ValuesReader {
+  private ByteBufferInputStream valuesInputStream = null;
   // Only used for booleans.
   private int bitOffset;
   private byte currentByte = 0;
 
-  public BytesReader() {
+  public ValuesAsBytesReader() {
   }
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream in) {
-    this.in = in;
+    this.valuesInputStream = in;
   }
 
   @Override
@@ -50,7 +51,7 @@ public class BytesReader extends ValuesReader {
 
   public ByteBuffer getBuffer(int length) {
     try {
-      return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+      return valuesInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
     } catch (IOException e) {
       throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
     }
@@ -67,21 +68,20 @@ public class BytesReader extends ValuesReader {
       currentByte = getByte();
     }
 
-    boolean v = (currentByte & (1 << bitOffset)) != 0;
+    boolean value = (currentByte & (1 << bitOffset)) != 0;
     bitOffset += 1;
     if (bitOffset == 8) {
       bitOffset = 0;
     }
-    return v;
+    return value;
   }
 
   private byte getByte() {
     try {
-      return (byte) in.read();
+      return (byte) valuesInputStream.read();
     } catch (IOException e) {
       throw new ParquetDecodingException("Failed to read a byte", e);
     }
   }
-
 }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/arrow/ArrowSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/ArrowSchemaUtil.java
new file mode 100644
index 0000000..1e7c4d4
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/ArrowSchemaUtil.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.arrow;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+
+public class ArrowSchemaUtil {
+  static final String ORIGINAL_TYPE = "originalType";
+  static final String MAP_TYPE = "mapType";
+  static final String MAP_KEY = "key";
+  static final String MAP_VALUE = "value";
+
+  private ArrowSchemaUtil() { }
+
+  /**
+   * Convert Iceberg schema to Arrow Schema.
+   *
+   * @param schema iceberg schema
+   * @return arrow schema
+   */
+  public static Schema convert(final org.apache.iceberg.Schema schema) {
+    final ImmutableList.Builder<Field> fields = ImmutableList.builder();
+
+    for (NestedField f : schema.columns()) {
+      fields.add(convert(f));
+    }
+
+    return new Schema(fields.build());
+  }
+
+  public static Field convert(final NestedField field) {
+    final ArrowType arrowType;
+
+    final List<Field> children = Lists.newArrayList();
+    Map<String, String> metadata = null;
+
+    switch (field.type().typeId()) {
+      case BINARY:
+        // Spark doesn't support BYTE(fixed_size) type, so cast it to VarBinary
+      case FIXED:
+        arrowType = ArrowType.Binary.INSTANCE;
+        break;
+      case BOOLEAN:
+        arrowType = ArrowType.Bool.INSTANCE;
+        break;
+      case INTEGER:
+        arrowType = new ArrowType.Int(Integer.SIZE, true);
+        break;
+      case LONG:
+        arrowType = new ArrowType.Int(Long.SIZE, true);
+        break;
+      case FLOAT:
+        arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+        break;
+      case DOUBLE:
+        arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+        break;
+      case DECIMAL:
+        final Types.DecimalType decimalType = (Types.DecimalType) field.type();
+        arrowType = new ArrowType.Decimal(decimalType.precision(), decimalType.scale());
+        break;
+      case STRING:
+        arrowType = ArrowType.Utf8.INSTANCE;
+        break;
+      case TIME:
+        arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
+        break;
+      case TIMESTAMP:
+        arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
+        break;
+      case DATE:
+        arrowType = new ArrowType.Date(DateUnit.DAY);
+        break;
+      case STRUCT:
+        final StructType struct = field.type().asStructType();
+        arrowType = ArrowType.Struct.INSTANCE;
+
+        for (NestedField nested : struct.fields()) {
+          children.add(convert(nested));
+        }
+        break;
+      case LIST:
+        final ListType listType = field.type().asListType();
+        arrowType = ArrowType.List.INSTANCE;
+
+        for (NestedField nested : listType.fields()) {
+          children.add(convert(nested));
+        }
+        break;
+      case MAP:
+        //Maps are represented as List<Struct<key, value>>
+        metadata = ImmutableMap.of(ORIGINAL_TYPE, MAP_TYPE);
+        final MapType mapType = field.type().asMapType();
+        arrowType = ArrowType.List.INSTANCE;
+
+        final List<Field> entryFields = Lists.newArrayList(
+            convert(required(0, MAP_KEY, mapType.keyType())),
+            convert(optional(0, MAP_VALUE, mapType.valueType()))
+        );
+
+        final Field entry = new Field("",
+            new FieldType(true, new ArrowType.Struct(), null), entryFields);
+        children.add(entry);
+        break;
+      default: throw new UnsupportedOperationException("Unsupported field type: " + field);
+    }
+
+    return new Field(field.name(), new FieldType(field.isOptional(), arrowType, null, metadata), children);
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
similarity index 87%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
index 7b30948..214a9a4 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
@@ -17,11 +17,22 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
 
-import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.*;
+import java.math.BigInteger;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.holders.NullableVarCharHolder;
@@ -41,12 +52,9 @@ import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.types.UTF8String;
 
-import java.math.BigInteger;
-
 /**
- * Implementation of Spark's {@link ColumnVector} interface. The main purpose
- * of this class is to prevent the expensive nullability checks made by Spark's
- * {@link ArrowColumnVector} implementation by delegating those calls to the
+ * Implementation of Spark's {@link ColumnVector} interface. The main purpose of this class is to prevent the expensive
+ * nullability checks made by Spark's {@link ArrowColumnVector} implementation by delegating those calls to the
  * Iceberg's {@link NullabilityHolder}.
  */
 
@@ -68,7 +76,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
     this.accessor = getVectorAccessor(columnDescriptor, holder.getVector());
   }
 
-  @VisibleForTesting
+  // public for testing purposes only
   public ArrowVectorAccessor getAccessor() {
     return accessor;
   }
@@ -173,9 +181,11 @@ public class IcebergArrowColumnVector extends ColumnVector {
   }
 
   @Override
-  public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
+  public ArrowColumnVector getChild(int ordinal) {
+    return childColumns[ordinal];
+  }
 
-  @VisibleForTesting
+  // public for testing purposes only
   public abstract class ArrowVectorAccessor {
 
     private final ValueVector vector;
@@ -237,12 +247,13 @@ public class IcebergArrowColumnVector extends ColumnVector {
       throw new UnsupportedOperationException();
     }
 
-    @VisibleForTesting
+    // public for testing purposes only
     public ValueVector getUnderlyingArrowVector() {
       return vector;
     }
   }
 
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   private ArrowVectorAccessor getVectorAccessor(ColumnDescriptor desc, ValueVector vector) {
     PrimitiveType primitive = desc.getPrimitiveType();
     if (isVectorDictEncoded) {
@@ -268,18 +279,27 @@ public class IcebergArrowColumnVector extends ColumnVector {
             switch (primitive.getPrimitiveTypeName()) {
               case BINARY:
               case FIXED_LEN_BYTE_ARRAY:
-                return new DictionaryDecimalBinaryAccessor((IntVector) vector, decimal.getPrecision(), decimal.getScale());
+                return new DictionaryDecimalBinaryAccessor(
+                    (IntVector) vector,
+                    decimal.getPrecision(),
+                    decimal.getScale());
               case INT64:
-                return new DictionaryDecimalLongAccessor((IntVector) vector, decimal.getPrecision(), decimal.getScale());
+                return new DictionaryDecimalLongAccessor(
+                    (IntVector) vector,
+                    decimal.getPrecision(),
+                    decimal.getScale());
               case INT32:
-                return new DictionaryDecimalIntAccessor((IntVector) vector, decimal.getPrecision(), decimal.getScale());
+                return new DictionaryDecimalIntAccessor(
+                    (IntVector) vector,
+                    decimal.getPrecision(),
+                    decimal.getScale());
               default:
                 throw new UnsupportedOperationException(
-                        "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+                    "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
             }
           default:
             throw new UnsupportedOperationException(
-                    "Unsupported logical type: " + primitive.getOriginalType());
+                "Unsupported logical type: " + primitive.getOriginalType());
         }
       } else {
         switch (primitive.getPrimitiveTypeName()) {
@@ -290,10 +310,10 @@ public class IcebergArrowColumnVector extends ColumnVector {
             return new DictionaryIntAccessor((IntVector) vector);
           case FLOAT:
             return new DictionaryFloatAccessor((IntVector) vector);
-//        case BOOLEAN:
-//          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-//          ((BitVector) vec).allocateNew(batchSize);
-//          return UNKNOWN_WIDTH;
+          //        case BOOLEAN:
+          //          this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+          //          ((BitVector) vec).allocateNew(batchSize);
+          //          return UNKNOWN_WIDTH;
           case INT64:
             return new DictionaryLongAccessor((IntVector) vector);
           case DOUBLE:
@@ -332,12 +352,12 @@ public class IcebergArrowColumnVector extends ColumnVector {
         return new ArrayAccessor(listVector);
       } else if (vector instanceof StructVector) {
         StructVector structVector = (StructVector) vector;
-        ArrowVectorAccessor accessor = new StructAccessor(structVector);
+        ArrowVectorAccessor structAccessor = new StructAccessor(structVector);
         childColumns = new ArrowColumnVector[structVector.size()];
         for (int i = 0; i < childColumns.length; ++i) {
           childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
         }
-        return accessor;
+        return structAccessor;
       }
     }
     throw new UnsupportedOperationException();
@@ -519,7 +539,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
 
     @Override
     final Decimal getDecimal(int rowId, int precision, int scale) {
-      if (isNullAt(rowId)) return null;
+      if (isNullAt(rowId)) {
+        return null;
+      }
       return Decimal.apply(vector.getObject(rowId), precision, scale);
     }
   }
@@ -540,7 +562,8 @@ public class IcebergArrowColumnVector extends ColumnVector {
       if (stringResult.isSet == 0) {
         return null;
       } else {
-        return UTF8String.fromAddress(null,
+        return UTF8String.fromAddress(
+            null,
             stringResult.buffer.memoryAddress() + stringResult.start,
             stringResult.end - stringResult.start);
       }
@@ -612,7 +635,6 @@ public class IcebergArrowColumnVector extends ColumnVector {
     }
   }
 
-
   private class DateAccessor extends ArrowVectorAccessor {
 
     private final DateDayVector vector;
@@ -688,11 +710,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
 
   /**
    * Any call to "get" method will throw UnsupportedOperationException.
-   *
-   * Access struct values in a ArrowColumnVector doesn't use this vector. Instead, it uses
-   * getStruct() method defined in the parent class. Any call to "get" method in this class is a
-   * bug in the code.
-   *
+   * <p>
+   * Access struct values in a ArrowColumnVector doesn't use this vector. Instead, it uses getStruct() method defined in
+   * the parent class. Any call to "get" method in this class is a bug in the code.
    */
   private class StructAccessor extends ArrowVectorAccessor {
 
@@ -704,7 +724,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
   private class DictionaryDecimalBinaryAccessor extends ArrowVectorAccessor {
     private final IntVector vector;
 
-    public DictionaryDecimalBinaryAccessor(IntVector vector, int precision, int scale) {
+    DictionaryDecimalBinaryAccessor(IntVector vector, int precision, int scale) {
       super(vector);
       this.vector = vector;
     }
@@ -713,7 +733,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
     //TODO: samarth refer to decodeDictionaryIds in VectorizedColumnReader
     @Override
     final Decimal getDecimal(int rowId, int precision, int scale) {
-      if (isNullAt(rowId)) return null;
+      if (isNullAt(rowId)) {
+        return null;
+      }
       Binary value = dictionary.decodeToBinary(vector.get(rowId));
       BigInteger unscaledValue = new BigInteger(value.getBytesUnsafe());
       return Decimal.apply(unscaledValue.longValue(), precision, scale);
@@ -723,7 +745,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
   private class DictionaryDecimalLongAccessor extends ArrowVectorAccessor {
     private final IntVector vector;
 
-    public DictionaryDecimalLongAccessor(IntVector vector, int precision, int scale) {
+    DictionaryDecimalLongAccessor(IntVector vector, int precision, int scale) {
       super(vector);
       this.vector = vector;
     }
@@ -731,7 +753,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
     //TODO: samarth not sure this is efficient or correct
     @Override
     final Decimal getDecimal(int rowId, int precision, int scale) {
-      if (isNullAt(rowId)) return null;
+      if (isNullAt(rowId)) {
+        return null;
+      }
       long unscaledValue = dictionary.decodeToLong(vector.get(rowId));
       return Decimal.apply(unscaledValue, precision, scale);
     }
@@ -740,7 +764,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
   private class DictionaryDecimalIntAccessor extends ArrowVectorAccessor {
     private final IntVector vector;
 
-    public DictionaryDecimalIntAccessor(IntVector vector, int precision, int scale) {
+    DictionaryDecimalIntAccessor(IntVector vector, int precision, int scale) {
       super(vector);
       this.vector = vector;
     }
@@ -748,7 +772,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
     //TODO: samarth not sure this is efficient or correct
     @Override
     final Decimal getDecimal(int rowId, int precision, int scale) {
-      if (isNullAt(rowId)) return null;
+      if (isNullAt(rowId)) {
+        return null;
+      }
       int unscaledValue = dictionary.decodeToInt(vector.get(rowId));
       return Decimal.apply(unscaledValue, precision, scale);
     }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
similarity index 88%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
index 9a833b0..932f30a 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
@@ -17,17 +17,15 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
 
 /**
- *
- * Extension of Arrow's @{@link DecimalVector}. The whole reason of having
- * this implementation is to override the expensive {@link DecimalVector#isSet(int)} method
- * used by  {@link DecimalVector#getObject(int)}.
+ * Extension of Arrow's @{@link DecimalVector}. The whole reason of having this implementation is to override the
+ * expensive {@link DecimalVector#isSet(int)} method used by  {@link DecimalVector#getObject(int)}.
  */
 public class IcebergDecimalArrowVector extends DecimalVector {
   private NullabilityHolder nullabilityHolder;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
similarity index 90%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
index 6d8922b..2dc5fb1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
@@ -17,15 +17,15 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
 
 /**
- * Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having
- * this implementation is to override the expensive {@link VarBinaryVector#isSet(int)} method.
+ * Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having this implementation is to override the
+ * expensive {@link VarBinaryVector#isSet(int)} method.
  */
 public class IcebergVarBinaryArrowVector extends VarBinaryVector {
   private NullabilityHolder nullabilityHolder;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
similarity index 88%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
index 646ddd1..0802f6c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
@@ -17,15 +17,15 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
 
 /**
- * Extension of Arrow's @{@link VarCharVector}. The reason of having
- * this implementation is to override the expensive {@link VarCharVector#isSet(int)} method.
+ * Extension of Arrow's @{@link VarCharVector}. The reason of having this implementation is to override the expensive
+ * {@link VarCharVector#isSet(int)} method.
  */
 public class IcebergVarcharArrowVector extends VarCharVector {
 
@@ -40,7 +40,7 @@ public class IcebergVarcharArrowVector extends VarCharVector {
   /**
    * Same as {@link #isNull(int)}.
    *
-   * @param index  position of element
+   * @param index position of element
    * @return 1 if element at given index is not null, 0 otherwise
    */
   public int isSet(int index) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
index 64c78af..5ac391b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
@@ -1,7 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iceberg.parquet.vectorized;
 
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Map;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
+import org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
@@ -9,60 +31,56 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.schema.Type;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
-import java.lang.reflect.Array;
-import java.util.List;
-import java.util.Map;
-
 /**
- * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's
- * vectorized read path. The {@link ColumnarBatch} returned is created by passing in the
- * Arrow vectors populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}.
+ * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized read path. The
+ * {@link ColumnarBatch} returned is created by passing in the Arrow vectors populated via delegated read calls to
+ * {@linkplain VectorizedArrowReader VectorReader(s)}.
  */
 public class ColumnarBatchReaders implements VectorizedReader {
-    private final VectorizedArrowReader[] readers;
-
-    public ColumnarBatchReaders(List<Type> types,
-                                Types.StructType icebergExpectedFields,
-                                List<VectorizedReader> readers) {
-        this.readers = (VectorizedArrowReader[]) Array.newInstance(
-                VectorizedArrowReader.class, readers.size());
-        int i = 0;
-        for (VectorizedReader reader : readers) {
-            this.readers[i] = (VectorizedArrowReader) reader;
-            i++;
-        }
+  private final VectorizedArrowReader[] readers;
 
+  public ColumnarBatchReaders(
+      List<Type> types,
+      Types.StructType icebergExpectedFields,
+      List<VectorizedReader> readers) {
+    this.readers = (VectorizedArrowReader[]) Array.newInstance(
+        VectorizedArrowReader.class, readers.size());
+    int idx = 0;
+    for (VectorizedReader reader : readers) {
+      this.readers[idx] = (VectorizedArrowReader) reader;
+      idx++;
     }
+  }
 
-    public final void setRowGroupInfo(PageReadStore pageStore,
-                                      DictionaryPageReadStore dictionaryPageReadStore,
-                                      Map<ColumnPath, Boolean> columnDictEncoded) {
-        for (int i = 0; i < readers.length; i += 1) {
-            readers[i].setRowGroupInfo(pageStore, dictionaryPageReadStore, columnDictEncoded);
-        }
+  public final void setRowGroupInfo(
+      PageReadStore pageStore,
+      DictionaryPageReadStore dictionaryPageReadStore,
+      Map<ColumnPath, Boolean> columnDictEncoded) {
+    for (int i = 0; i < readers.length; i += 1) {
+      readers[i].setRowGroupInfo(pageStore, dictionaryPageReadStore, columnDictEncoded);
     }
+  }
 
-    public final ColumnarBatch read(ColumnarBatch ignore) {
-        IcebergArrowColumnVector[] icebergArrowColumnVectors = (IcebergArrowColumnVector[]) Array.newInstance(IcebergArrowColumnVector.class,
-            readers.length);
-        int numRows = 0;
-        for (int i = 0; i < readers.length; i += 1) {
-            NullabilityHolder nullabilityHolder = new NullabilityHolder(readers[i].batchSize());
-            VectorHolder holder = readers[i].read(nullabilityHolder);
-            FieldVector vector = holder.getVector();
-            icebergArrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
-            if (i > 0 && numRows != vector.getValueCount()) {
-                throw new IllegalStateException("Different number of values returned by readers" +
-                    "for columns " + readers[i - 1] + " and " + readers[i]);
-            }
-            numRows = vector.getValueCount();
-        }
-
-        ColumnarBatch batch = new ColumnarBatch(icebergArrowColumnVectors);
-        batch.setNumRows(numRows);
-
-        return batch;
+  public final ColumnarBatch read(ColumnarBatch ignore) {
+    IcebergArrowColumnVector[] icebergArrowColumnVectors = (IcebergArrowColumnVector[]) Array.newInstance(
+        IcebergArrowColumnVector.class,
+        readers.length);
+    int numRows = 0;
+    for (int i = 0; i < readers.length; i += 1) {
+      NullabilityHolder nullabilityHolder = new NullabilityHolder(readers[i].batchSize());
+      VectorHolder holder = readers[i].read(nullabilityHolder);
+      FieldVector vector = holder.getVector();
+      icebergArrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
+      if (i > 0 && numRows != vector.getValueCount()) {
+        throw new IllegalStateException("Different number of values returned by readers" +
+            "for columns " + readers[i - 1] + " and " + readers[i]);
+      }
+      numRows = vector.getValueCount();
     }
 
-}
+    ColumnarBatch batch = new ColumnarBatch(icebergArrowColumnVectors);
+    batch.setNumRows(numRows);
 
+    return batch;
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
index 70b84d9..db625ea 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iceberg.parquet.vectorized;
 
 public class NullabilityHolder {
@@ -26,22 +27,11 @@ public class NullabilityHolder {
     this.isNull = new boolean[batchSize];
   }
 
-
   public void setNull(int idx) {
-    isNull[idx] =  true;
+    isNull[idx] = true;
     numNulls++;
   }
 
-  public void setNulls(int idx, int num) {
-    int i = 0;
-    while (i < num) {
-      isNull[idx] = true;
-      numNulls++;
-      idx++;
-      i++;
-    }
-  }
-
   public boolean isNullAt(int idx) {
     return isNull[idx];
   }
@@ -53,4 +43,4 @@ public class NullabilityHolder {
   public int numNulls() {
     return numNulls;
   }
-}
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
index f11c968..d28a6af 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
@@ -19,46 +19,47 @@
 
 package org.apache.iceberg.parquet.vectorized;
 
+import javax.annotation.Nullable;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
 
-import javax.annotation.Nullable;
-
 /**
- * Container class for holding the Arrow vector holding a batch of values
- * along with other state needed for reading values out of it.
+ * Container class for holding the Arrow vector holding a batch of values along with other state needed for reading
+ * values out of it.
  */
 public class VectorHolder {
-    private final ColumnDescriptor columnDescriptor;
-    private final FieldVector vector;
-    private final boolean isDictionaryEncoded;
-
-    @Nullable
-    private final Dictionary dictionary;
-
+  private final ColumnDescriptor columnDescriptor;
+  private final FieldVector vector;
+  private final boolean isDictionaryEncoded;
 
-    public VectorHolder(ColumnDescriptor columnDescriptor, FieldVector vector, boolean isDictionaryEncoded, Dictionary dictionary) {
-        this.columnDescriptor = columnDescriptor;
-        this.vector = vector;
-        this.isDictionaryEncoded = isDictionaryEncoded;
-        this.dictionary = dictionary;
-    }
+  @Nullable
+  private final Dictionary dictionary;
 
-    public ColumnDescriptor getDescriptor() {
-        return columnDescriptor;
-    }
+  public VectorHolder(
+      ColumnDescriptor columnDescriptor,
+      FieldVector vector,
+      boolean isDictionaryEncoded,
+      Dictionary dictionary) {
+    this.columnDescriptor = columnDescriptor;
+    this.vector = vector;
+    this.isDictionaryEncoded = isDictionaryEncoded;
+    this.dictionary = dictionary;
+  }
 
-    public FieldVector getVector() {
-        return vector;
-    }
+  public ColumnDescriptor getDescriptor() {
+    return columnDescriptor;
+  }
 
-    public boolean isDictionaryEncoded() {
-        return isDictionaryEncoded;
-    }
+  public FieldVector getVector() {
+    return vector;
+  }
 
-    public Dictionary getDictionary() {
-        return dictionary;
-    }
+  public boolean isDictionaryEncoded() {
+    return isDictionaryEncoded;
+  }
 
-}
+  public Dictionary getDictionary() {
+    return dictionary;
+  }
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
index f44c5dc..9da909d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
@@ -19,16 +19,25 @@
 
 package org.apache.iceberg.parquet.vectorized;
 
+import java.util.Map;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
-import org.apache.iceberg.arrow.ArrowSchemaUtil;
 import org.apache.iceberg.parquet.ParquetUtil;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergDecimalArrowVector;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarBinaryArrowVector;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarcharArrowVector;
+import org.apache.iceberg.parquet.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.arrow.IcebergDecimalArrowVector;
+import org.apache.iceberg.parquet.arrow.IcebergVarBinaryArrowVector;
+import org.apache.iceberg.parquet.arrow.IcebergVarcharArrowVector;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -38,8 +47,6 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.PrimitiveType;
 
-import java.util.Map;
-
 /***
  * {@link VectorizedReader VectorReader(s)} that read in a batch of values into Arrow vectors.
  * It also takes care of allocating the right kind of Arrow vectors depending on the corresponding
@@ -79,12 +86,12 @@ public class VectorizedArrowReader implements VectorizedReader {
   public VectorizedArrowReader(
       ColumnDescriptor desc,
       Types.NestedField icebergField,
-      BufferAllocator rootAlloc,
+      BufferAllocator ra,
       int batchSize) {
     this.icebergField = icebergField;
     this.batchSize = (batchSize == 0) ? DEFAULT_BATCH_SIZE : batchSize;
     this.columnDescriptor = desc;
-    this.rootAlloc = rootAlloc;
+    this.rootAlloc = ra;
     this.isFixedLengthDecimal = ParquetUtil.isFixedLengthDecimal(desc);
     this.isVarWidthType = ParquetUtil.isVarWidthType(desc);
     this.isFixedWidthBinary = ParquetUtil.isFixedWidthBinary(desc);
@@ -97,9 +104,10 @@ public class VectorizedArrowReader implements VectorizedReader {
     this.vectorizedColumnIterator = new VectorizedColumnIterator(desc, "", batchSize);
   }
 
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public VectorHolder read(NullabilityHolder nullabilityHolder) {
     if (vec == null) {
-      typeWidth = allocateFieldVector(rootAlloc, icebergField, columnDescriptor);
+      typeWidth = allocateFieldVector();
     }
     vec.setValueCount(0);
     if (vectorizedColumnIterator.hasNext()) {
@@ -141,16 +149,19 @@ public class VectorizedArrowReader implements VectorizedReader {
     return new VectorHolder(columnDescriptor, vec, allPagesDictEncoded, dictionary);
   }
 
-  private int allocateFieldVector(BufferAllocator rootAlloc, Types.NestedField icebergField, ColumnDescriptor desc) {
+  private int allocateFieldVector() {
     if (allPagesDictEncoded) {
-      Field field = new Field(icebergField.name(), new FieldType(icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null), null);
+      Field field = new Field(
+          icebergField.name(),
+          new FieldType(icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null),
+          null);
       this.vec = field.createVector(rootAlloc);
       ((IntVector) vec).allocateNew(batchSize);
       return IntVector.TYPE_WIDTH;
     } else {
-      PrimitiveType primitive = desc.getPrimitiveType();
+      PrimitiveType primitive = columnDescriptor.getPrimitiveType();
       if (primitive.getOriginalType() != null) {
-        switch (desc.getPrimitiveType().getOriginalType()) {
+        switch (columnDescriptor.getPrimitiveType().getOriginalType()) {
           case ENUM:
           case JSON:
           case UTF8:
@@ -182,7 +193,7 @@ public class VectorizedArrowReader implements VectorizedReader {
           case DECIMAL:
             DecimalMetadata decimal = primitive.getDecimalMetadata();
             this.vec = new IcebergDecimalArrowVector(icebergField.name(), rootAlloc, decimal.getPrecision(),
-                    decimal.getScale());
+                decimal.getScale());
             ((DecimalVector) vec).allocateNew(batchSize);
             switch (primitive.getPrimitiveTypeName()) {
               case BINARY:
@@ -194,18 +205,18 @@ public class VectorizedArrowReader implements VectorizedReader {
                 return IntVector.TYPE_WIDTH;
               default:
                 throw new UnsupportedOperationException(
-                        "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+                    "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
             }
           default:
             throw new UnsupportedOperationException(
-                    "Unsupported logical type: " + primitive.getOriginalType());
+                "Unsupported logical type: " + primitive.getOriginalType());
         }
       } else {
         switch (primitive.getPrimitiveTypeName()) {
           case FIXED_LEN_BYTE_ARRAY:
             int len = ((Types.FixedType) icebergField.type()).length();
             this.vec = new IcebergVarBinaryArrowVector(icebergField.name(), rootAlloc);
-            int factor = (len + DEFAULT_RECORD_BYTE_COUNT - 1) / (DEFAULT_RECORD_BYTE_COUNT);
+            int factor = (len + DEFAULT_RECORD_BYTE_COUNT - 1) / DEFAULT_RECORD_BYTE_COUNT;
             vec.setInitialCapacity(batchSize * factor);
             vec.allocateNew();
             return len;
@@ -242,9 +253,10 @@ public class VectorizedArrowReader implements VectorizedReader {
     }
   }
 
-  public void setRowGroupInfo(PageReadStore source,
-                              DictionaryPageReadStore dictionaryPageReadStore,
-                              Map<ColumnPath, Boolean> columnDictEncoded) {
+  public void setRowGroupInfo(
+      PageReadStore source,
+      DictionaryPageReadStore dictionaryPageReadStore,
+      Map<ColumnPath, Boolean> columnDictEncoded) {
     allPagesDictEncoded = columnDictEncoded.get(ColumnPath.get(columnDescriptor.getPath()));
     dictionary = vectorizedColumnIterator.setRowGroupInfo(source, dictionaryPageReadStore, allPagesDictEncoded);
   }
@@ -257,6 +269,4 @@ public class VectorizedArrowReader implements VectorizedReader {
   public int batchSize() {
     return batchSize;
   }
-
 }
-
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
index fafaec6..703e68e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
@@ -20,17 +20,20 @@
 package org.apache.iceberg.parquet.vectorized;
 
 import java.io.IOException;
-
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.io.ParquetDecodingException;
 
 /**
- * Vectorized version of the ColumnIterator that reads column values in data pages of a column
- * in a row group in a batched fashion.
+ * Vectorized version of the ColumnIterator that reads column values in data pages of a column in a row group in a
+ * batched fashion.
  */
 public class VectorizedColumnIterator {
 
@@ -50,15 +53,16 @@ public class VectorizedColumnIterator {
     this.vectorizedPageIterator = new VectorizedPageIterator(desc, writerVersion, batchSize);
   }
 
-  public Dictionary setRowGroupInfo(PageReadStore store,
-                                    DictionaryPageReadStore dictionaryPageReadStore,
-                                    boolean allPagesDictEncoded) {
+  public Dictionary setRowGroupInfo(
+      PageReadStore store,
+      DictionaryPageReadStore dictionaryPageReadStore,
+      boolean allPagesDictEncoded) {
     this.columnPageReader = store.getPageReader(desc);
     this.totalValuesCount = columnPageReader.getTotalValueCount();
     this.valuesRead = 0L;
     this.advanceNextPageCount = 0L;
     this.vectorizedPageIterator.reset();
-    Dictionary dict = readDictionaryForColumn(desc, dictionaryPageReadStore);
+    Dictionary dict = readDictionaryForColumn(dictionaryPageReadStore);
     this.vectorizedPageIterator.setDictionaryForColumn(dict, allPagesDictEncoded);
     advance();
     return dict;
@@ -91,7 +95,7 @@ public class VectorizedColumnIterator {
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = vectorizedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, typeWidth, holder);
+          rowsReadSoFar, typeWidth, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
@@ -106,7 +110,7 @@ public class VectorizedColumnIterator {
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = vectorizedPageIterator.nextBatchDictionaryIds(vector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, holder);
+          rowsReadSoFar, holder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       vector.setValueCount(rowsReadSoFar);
@@ -161,11 +165,15 @@ public class VectorizedColumnIterator {
   /**
    * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
    */
-  public void nextBatchIntLongBackedDecimal(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
+  public void nextBatchIntLongBackedDecimal(
+      FieldVector fieldVector,
+      int typeWidth,
+      NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch =
+          vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -176,11 +184,15 @@ public class VectorizedColumnIterator {
   /**
    * Method for reading a batch of decimals backed by fixed length byte array parquet data type.
    */
-  public void nextBatchFixedLengthDecimal(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
+  public void nextBatchFixedLengthDecimal(
+      FieldVector fieldVector,
+      int typeWidth,
+      NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch =
+          vectorizedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -196,7 +208,7 @@ public class VectorizedColumnIterator {
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = vectorizedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, nullabilityHolder);
+          rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
@@ -210,7 +222,8 @@ public class VectorizedColumnIterator {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
-      int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
+      int rowsInThisBatch =
+          vectorizedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
               rowsReadSoFar, typeWidth, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
@@ -226,15 +239,15 @@ public class VectorizedColumnIterator {
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch = vectorizedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, nullabilityHolder);
+          rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.valuesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
     }
   }
 
-  private Dictionary readDictionaryForColumn(ColumnDescriptor desc,
-                                             DictionaryPageReadStore dictionaryPageReadStore) {
+  private Dictionary readDictionaryForColumn(
+      DictionaryPageReadStore dictionaryPageReadStore) {
     if (dictionaryPageReadStore == null) {
       return null;
     }
@@ -248,5 +261,4 @@ public class VectorizedColumnIterator {
     }
     return null;
   }
-
-}
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
index 618be17..c62baf3 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
@@ -19,12 +19,15 @@
 
 package org.apache.iceberg.parquet.vectorized;
 
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+
 import com.google.common.base.Preconditions;
+import java.io.IOException;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.iceberg.parquet.BytesReader;
+import org.apache.iceberg.parquet.ValuesAsBytesReader;
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
@@ -42,404 +45,519 @@ import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-
 public class VectorizedPageIterator {
-    private static final Logger LOG = LoggerFactory.getLogger(VectorizedPageIterator.class);
-    public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
-        this.desc = desc;
-        this.writerVersion = writerVersion;
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedPageIterator.class);
+
+  public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
+    this.desc = desc;
+    this.writerVersion = writerVersion;
+  }
+
+  private final ColumnDescriptor desc;
+  private final String writerVersion;
+
+  // iterator state
+  private boolean hasNext = false;
+  private int triplesRead = 0;
+
+  // page bookkeeping
+  private Dictionary dict = null;
+  private DataPage page = null;
+  private int triplesCount = 0;
+
+  // Needed once we add support for complex types. Unused for now.
+  private IntIterator repetitionLevels = null;
+  private int currentRL = 0;
+
+  private VectorizedParquetValuesReader definitionLevelReader;
+  private boolean eagerDecodeDictionary;
+  private ValuesAsBytesReader plainValuesReader = null;
+  private VectorizedParquetValuesReader dictionaryEncodedValuesReader = null;
+  private boolean allPagesDictEncoded;
+
+  public void setPage(DataPage page) {
+    this.page = Preconditions.checkNotNull(page, "Cannot read from null page");
+    this.page.accept(new DataPage.Visitor<ValuesReader>() {
+      @Override
+      public ValuesReader visit(DataPageV1 dataPageV1) {
+        initFromPage(dataPageV1);
+        return null;
+      }
+
+      @Override
+      public ValuesReader visit(DataPageV2 dataPageV2) {
+        initFromPage(dataPageV2);
+        return null;
+      }
+    });
+    this.triplesRead = 0;
+    advance();
+  }
+
+  // Dictionary is set per row group
+  public void setDictionaryForColumn(Dictionary dict, boolean allPagesDictEncoded) {
+    this.dict = dict;
+    this.allPagesDictEncoded = allPagesDictEncoded;
+  }
+
+  public void reset() {
+    this.page = null;
+    this.triplesCount = 0;
+    this.triplesRead = 0;
+    this.repetitionLevels = null;
+    this.plainValuesReader = null;
+    this.definitionLevelReader = null;
+    this.hasNext = false;
+  }
+
+  public int currentPageCount() {
+    return triplesCount;
+  }
+
+  public boolean hasNext() {
+    return hasNext;
+  }
+
+  /**
+   * Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels,
+   * dictionary ids in Parquet are RLE encoded as well.
+   */
+  public int nextBatchDictionaryIds(
+      final IntVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    private final ColumnDescriptor desc;
-    private final String writerVersion;
-
-    // iterator state
-    private boolean hasNext = false;
-    private int triplesRead = 0;
-
-    // page bookkeeping
-    private Dictionary dict = null;
-    private DataPage page = null;
-    private int triplesCount = 0;
-
-    // Needed once we add support for complex types. Unused for now.
-    private IntIterator repetitionLevels = null;
-    private int currentRL = 0;
-
-    private VectorizedParquetValuesReader definitionLevelReader;
-    private boolean eagerDecodeDictionary;
-    private BytesReader plainValuesReader = null;
-    private VectorizedParquetValuesReader dictionaryEncodedValuesReader = null;
-    private boolean allPagesDictEncoded;
-
-    public void setPage(DataPage page) {
-        this.page = Preconditions.checkNotNull(page, "Cannot read from null page");
-        this.page.accept(new DataPage.Visitor<ValuesReader>() {
-            @Override
-            public ValuesReader visit(DataPageV1 dataPageV1) {
-                initFromPage(dataPageV1);
-                return null;
-            }
-
-            @Override
-            public ValuesReader visit(DataPageV2 dataPageV2) {
-                initFromPage(dataPageV2);
-                return null;
-            }
-        });
-        this.triplesRead = 0;
-        advance();
+    definitionLevelReader.readBatchOfDictionaryIds(
+        vector,
+        numValsInVector,
+        actualBatchSize,
+        holder,
+        dictionaryEncodedValuesReader);
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of INT32 data type
+   */
+  public int nextBatchIntegers(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    // Dictionary is set per row group
-    public void setDictionaryForColumn(Dictionary dict, boolean allPagesDictEncoded) {
-        this.dict = dict;
-        this.allPagesDictEncoded = allPagesDictEncoded;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedIntegers(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchOfIntegers(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
     }
-
-    public void reset() {
-        this.page = null;
-        this.triplesCount = 0;
-        this.triplesRead = 0;
-        this.repetitionLevels = null;
-        this.plainValuesReader = null;
-        this.definitionLevelReader = null;
-        this.hasNext = false;
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of INT64 data type
+   */
+  public int nextBatchLongs(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    public int currentPageCount() {
-        return triplesCount;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedLongs(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchOfLongs(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
     }
-
-    public boolean hasNext() {
-        return hasNext;
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of FLOAT data type.
+   */
+  public int nextBatchFloats(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    /**
-     * Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels, dictionary ids in Parquet are RLE
-     * encoded as well.
-     */
-    public int nextBatchDictionaryIds(final IntVector vector, final int expectedBatchSize,
-                                      final int numValsInVector,
-                                      NullabilityHolder holder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        definitionLevelReader.readBatchOfDictionaryIds(vector, numValsInVector, actualBatchSize, holder, dictionaryEncodedValuesReader);
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedFloats(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchOfFloats(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
     }
-
-    /**
-     * Method for reading a batch of values of INT32 data type
-     */
-    public int nextBatchIntegers(final FieldVector vector, final int expectedBatchSize,
-                                 final int numValsInVector,
-                                 final int typeWidth, NullabilityHolder holder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedIntegers(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchOfIntegers(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of DOUBLE data type
+   */
+  public int nextBatchDoubles(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    /**
-     * Method for reading a batch of values of INT64 data type
-     */
-    public int nextBatchLongs(final FieldVector vector, final int expectedBatchSize,
-                              final int numValsInVector,
-                              final int typeWidth, NullabilityHolder holder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedLongs(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchOfLongs(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedDoubles(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchOfDoubles(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
     }
-
-    /**
-     * Method for reading a batch of values of FLOAT data type.
-     */
-    public int nextBatchFloats(final FieldVector vector, final int expectedBatchSize,
-                               final int numValsInVector,
-                               final int typeWidth, NullabilityHolder holder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedFloats(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchOfFloats(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  private int getActualBatchSize(int expectedBatchSize) {
+    return Math.min(expectedBatchSize, triplesCount - triplesRead);
+  }
+
+  /**
+   * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since Arrow stores all
+   * decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
+   */
+  public int nextBatchIntLongBackedDecimal(
+      final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+      final int typeWidth, NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    /**
-     * Method for reading a batch of values of DOUBLE data type
-     */
-    public int nextBatchDoubles(final FieldVector vector, final int expectedBatchSize,
-                                final int numValsInVector,
-                                final int typeWidth, NullabilityHolder holder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedDoubles(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchOfDoubles(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedIntLongBackedDecimals(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchOfIntLongBackedDecimals(vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
     }
-
-    private int getActualBatchSize(int expectedBatchSize) {
-        return Math.min(expectedBatchSize, triplesCount - triplesRead);
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of decimals backed by fixed length byte array parquet data type. Arrow stores all
+   * decimals in 16 bytes. This method provides the necessary padding to the decimals read. Moreover, Arrow interprets
+   * the decimals in Arrow buffer as little endian. Parquet stores fixed length decimals as big endian. So, this method
+   * uses {@link DecimalVector#setBigEndian(int, byte[])} method so that the data in Arrow vector is indeed little
+   * endian.
+   */
+  public int nextBatchFixedLengthDecimal(
+      final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+      final int typeWidth, NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    /**
-     * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
-     * Since Arrow stores all decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
-     */
-    public int nextBatchIntLongBackedDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-                                             final int typeWidth, NullabilityHolder nullabilityHolder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedIntLongBackedDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchOfIntLongBackedDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder,
-                    plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchOfFixedLengthDecimals(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
     }
-
-    /**
-     * Method for reading a batch of decimals backed by fixed length byte array parquet data type.
-     * Arrow stores all decimals in 16 bytes. This method provides the necessary padding to the decimals read.
-     * Moreover, Arrow interprets the decimals in Arrow buffer as little endian. Parquet stores fixed length decimals
-     * as big endian. So, this method uses {@link DecimalVector#setBigEndian(int, byte[])} method so that the data in
-     * Arrow vector is indeed little endian.
-     */
-    public int nextBatchFixedLengthDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-                                           final int typeWidth, NullabilityHolder nullabilityHolder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchOfFixedLengthDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
+   */
+  public int nextBatchVarWidthType(
+      final FieldVector vector, final int expectedBatchSize, final int numValsInVector
+      , NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    /**
-     * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
-     */
-    public int nextBatchVarWidthType(final FieldVector vector, final int expectedBatchSize, final int numValsInVector
-            , NullabilityHolder nullabilityHolder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedVarWidth(vector, numValsInVector, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchVarWidth(vector, numValsInVector, actualBatchSize, nullabilityHolder, plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedVarWidth(
+          vector,
+          numValsInVector,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchVarWidth(
+          vector,
+          numValsInVector,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
     }
-
-    /**
-     * Method for reading batches of fixed width binary type (e.g. BYTE[7]).
-     * Spark does not support fixed width binary data type. To work around this limitation, the data is read as
-     * fixed width binary from parquet and stored in a {@link VarBinaryVector} in Arrow.
-     */
-    public int nextBatchFixedWidthBinary(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-                                         final int typeWidth, NullabilityHolder nullabilityHolder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        if (eagerDecodeDictionary) {
-            definitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
-        } else {
-            definitionLevelReader.readBatchOfFixedWidthBinary(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, plainValuesReader);
-        }
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading batches of fixed width binary type (e.g. BYTE[7]). Spark does not support fixed width binary
+   * data type. To work around this limitation, the data is read as fixed width binary from parquet and stored in a
+   * {@link VarBinaryVector} in Arrow.
+   */
+  public int nextBatchFixedWidthBinary(
+      final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+      final int typeWidth, NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    /**
-     * Method for reading batches of booleans.
-     */
-    public int nextBatchBoolean(final FieldVector vector, final int expectedBatchSize, final int numValsInVector, NullabilityHolder nullabilityHolder) {
-        final int actualBatchSize = getActualBatchSize(expectedBatchSize);
-        if (actualBatchSize <= 0) {
-            return 0;
-        }
-        definitionLevelReader.readBatchOfBooleans(vector, numValsInVector, actualBatchSize,
-                nullabilityHolder, plainValuesReader);
-        triplesRead += actualBatchSize;
-        this.hasNext = triplesRead < triplesCount;
-        return actualBatchSize;
+    if (eagerDecodeDictionary) {
+      definitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dict);
+    } else {
+      definitionLevelReader.readBatchOfFixedWidthBinary(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
     }
-
-    private void advance() {
-        if (triplesRead < triplesCount) {
-            this.hasNext = true;
-        } else {
-            this.hasNext = false;
-        }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading batches of booleans.
+   */
+  public int nextBatchBoolean(
+      final FieldVector vector,
+      final int expectedBatchSize,
+      final int numValsInVector,
+      NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
     }
-
-    private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
-        ValuesReader previousReader = plainValuesReader;
-        this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dict != null && !allPagesDictEncoded;
-        if (dataEncoding.usesDictionary()) {
-            if (dict == null) {
-                throw new ParquetDecodingException(
-                        "could not read page in col " + desc + " as the dictionary was missing for encoding " + dataEncoding);
-            }
-            try {
-                dictionaryEncodedValuesReader = new VectorizedParquetValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
-                dictionaryEncodedValuesReader.initFromPage(valueCount, in);
-            } catch (IOException e) {
-                throw new ParquetDecodingException("could not read page in col " + desc, e);
-            }
-        } else {
-            plainValuesReader = new BytesReader();
-            plainValuesReader.initFromPage(valueCount, in);
-        }
-        if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
-                previousReader != null && previousReader instanceof RequiresPreviousReader) {
-            // previous reader can only be set if reading sequentially
-            ((RequiresPreviousReader) plainValuesReader).setPreviousReader(previousReader);
-        }
+    definitionLevelReader.readBatchOfBooleans(vector, numValsInVector, actualBatchSize,
+        nullabilityHolder, plainValuesReader);
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  private void advance() {
+    if (triplesRead < triplesCount) {
+      this.hasNext = true;
+    } else {
+      this.hasNext = false;
     }
-
-
-    private void initFromPage(DataPageV1 page) {
-        this.triplesCount = page.getValueCount();
-        ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
-        ValuesReader dlReader;
-        int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
-        this.definitionLevelReader = new VectorizedParquetValuesReader(
-                bitWidth, desc.getMaxDefinitionLevel());
-        dlReader = this.definitionLevelReader;
-        this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
-        try {
-            BytesInput bytes = page.getBytes();
-            ByteBufferInputStream in = bytes.toInputStream();
-            rlReader.initFromPage(triplesCount, in);
-            dlReader.initFromPage(triplesCount, in);
-            initDataReader(page.getValueEncoding(), in, page.getValueCount());
-        } catch (IOException e) {
-            throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
-        }
+  }
+
+  private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
+    ValuesReader previousReader = plainValuesReader;
+    this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dict != null && !allPagesDictEncoded;
+    if (dataEncoding.usesDictionary()) {
+      if (dict == null) {
+        throw new ParquetDecodingException(
+            "could not read page in col " + desc + " as the dictionary was missing for encoding " + dataEncoding);
+      }
+      try {
+        dictionaryEncodedValuesReader =
+            new VectorizedParquetValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
+        dictionaryEncodedValuesReader.initFromPage(valueCount, in);
+      } catch (IOException e) {
+        throw new ParquetDecodingException("could not read page in col " + desc, e);
+      }
+    } else {
+      plainValuesReader = new ValuesAsBytesReader();
+      plainValuesReader.initFromPage(valueCount, in);
     }
-
-    private void initFromPage(DataPageV2 page) {
-        this.triplesCount = page.getValueCount();
-        this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), page.getRepetitionLevels());
-        LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
-        try {
-            int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
-            initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
-            this.definitionLevelReader = new VectorizedParquetValuesReader(bitWidth, false,
-                    desc.getMaxDefinitionLevel());
-            definitionLevelReader.initFromPage(triplesCount, page.getDefinitionLevels().toInputStream());
-        } catch (IOException e) {
-            throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
-        }
+    if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
+        previousReader != null && previousReader instanceof RequiresPreviousReader) {
+      // previous reader can only be set if reading sequentially
+      ((RequiresPreviousReader) plainValuesReader).setPreviousReader(previousReader);
     }
-
-    private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
-        try {
-            if (maxLevel == 0) {
-                return new NullIntIterator();
-            }
-            return new RLEIntIterator(
-                    new RunLengthBitPackingHybridDecoder(
-                            BytesUtils.getWidthFromMaxInt(maxLevel),
-                            bytes.toInputStream()));
-        } catch (IOException e) {
-            throw new ParquetDecodingException("could not read levels in page for col " + desc, e);
-        }
+  }
+
+  private void initFromPage(DataPageV1 page) {
+    this.triplesCount = page.getValueCount();
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
+    ValuesReader dlReader;
+    int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+    this.definitionLevelReader = new VectorizedParquetValuesReader(
+        bitWidth, desc.getMaxDefinitionLevel());
+    dlReader = this.definitionLevelReader;
+    this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
+    try {
+      BytesInput bytes = page.getBytes();
+      ByteBufferInputStream in = bytes.toInputStream();
+      rlReader.initFromPage(triplesCount, in);
+      dlReader.initFromPage(triplesCount, in);
+      initDataReader(page.getValueEncoding(), in, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
+    }
+  }
+
+  private void initFromPage(DataPageV2 page) {
+    this.triplesCount = page.getValueCount();
+    this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), page.getRepetitionLevels());
+    LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
+    try {
+      int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+      initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
+      this.definitionLevelReader = new VectorizedParquetValuesReader(bitWidth, false,
+          desc.getMaxDefinitionLevel());
+      definitionLevelReader.initFromPage(triplesCount, page.getDefinitionLevels().toInputStream());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
     }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+          new RunLengthBitPackingHybridDecoder(
+              BytesUtils.getWidthFromMaxInt(maxLevel),
+              bytes.toInputStream()));
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read levels in page for col " + desc, e);
+    }
+  }
+
+  static abstract class IntIterator {
+    abstract int nextInt();
+  }
 
-    static abstract class IntIterator {
-        abstract int nextInt();
+  static class ValuesReaderIntIterator extends IntIterator {
+    ValuesReader delegate;
+
+    ValuesReaderIntIterator(ValuesReader delegate) {
+      super();
+      this.delegate = delegate;
     }
 
-    static class ValuesReaderIntIterator extends IntIterator {
-        ValuesReader delegate;
+    @Override
+    int nextInt() {
+      return delegate.readInteger();
+    }
+  }
 
-        ValuesReaderIntIterator(ValuesReader delegate) {
-            super();
-            this.delegate = delegate;
-        }
+  static class RLEIntIterator extends IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
 
-        @Override
-        int nextInt() {
-            return delegate.readInteger();
-        }
+    RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
     }
 
-    static class RLEIntIterator extends IntIterator {
-        RunLengthBitPackingHybridDecoder delegate;
-
-        RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
-            this.delegate = delegate;
-        }
-
-        @Override
-        int nextInt() {
-            try {
-                return delegate.readInt();
-            } catch (IOException e) {
-                throw new ParquetDecodingException(e);
-            }
-        }
+    @Override
+    int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
     }
+  }
 
-    private static final class NullIntIterator extends IntIterator {
-        @Override
-        int nextInt() {
-            return 0;
-        }
+  private static final class NullIntIterator extends IntIterator {
+    @Override
+    int nextInt() {
+      return 0;
     }
-}
+  }
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
index 5500352..1745eb0 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
@@ -1,25 +1,35 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iceberg.parquet.vectorized;
 
 import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.*;
-import org.apache.iceberg.parquet.BytesReader;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.iceberg.parquet.ValuesAsBytesReader;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
@@ -29,1205 +39,1331 @@ import org.apache.parquet.column.values.bitpacking.BytePacker;
 import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 /**
- * A values reader for Parquet's run-length encoded data that reads column data in batches
- * instead of one value at a time.
- * This is based off of the version in Apache Spark with these changes:
+ * A values reader for Parquet's run-length encoded data that reads column data in batches instead of one value at a
+ * time. This is based off of the version in Apache Spark with these changes:
  * <p>
  * <tr>Writes batches of values retrieved to Arrow vectors</tr>
  * <tr>If all pages of a column within the row group are not dictionary encoded, then
- * dictionary ids are eagerly decoded into actual values before writing them
- * to the Arrow vectors</tr>
+ * dictionary ids are eagerly decoded into actual values before writing them to the Arrow vectors</tr>
  * </p>
  */
 public final class VectorizedParquetValuesReader extends ValuesReader {
 
-    // Current decoding mode. The encoded data contains groups of either run length encoded data
-    // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
-    // the number of values in the group.
-    private enum MODE {
-        RLE,
-        PACKED
-    }
+  // Current decoding mode. The encoded data contains groups of either run length encoded data
+  // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+  // the number of values in the group.
+  private enum MODE {
+    RLE,
+    PACKED
+  }
 
-    // Encoded data.
-    private ByteBufferInputStream in;
+  // Encoded data.
+  private ByteBufferInputStream inputStream;
 
-    // bit/byte width of decoded data and utility to batch unpack them.
-    private int bitWidth;
-    private int bytesWidth;
-    private BytePacker packer;
+  // bit/byte width of decoded data and utility to batch unpack them.
+  private int bitWidth;
+  private int bytesWidth;
+  private BytePacker packer;
 
-    // Current decoding mode and values
-    private MODE mode;
-    private int currentCount;
-    private int currentValue;
+  // Current decoding mode and values
+  private MODE mode;
+  private int currentCount;
+  private int currentValue;
 
-    // Buffer of decoded values if the values are PACKED.
-    private int[] packedValuesBuffer = new int[16];
-    private int packedValuesBufferIdx = 0;
+  // Buffer of decoded values if the values are PACKED.
+  private int[] packedValuesBuffer = new int[16];
+  private int packedValuesBufferIdx = 0;
 
-    // If true, the bit width is fixed. This decoder is used in different places and this also
-    // controls if we need to read the bitwidth from the beginning of the data stream.
-    private final boolean fixedWidth;
-    private final boolean readLength;
-    private final int maxDefLevel;
+  // If true, the bit width is fixed. This decoder is used in different places and this also
+  // controls if we need to read the bitwidth from the beginning of the data stream.
+  private final boolean fixedWidth;
+  private final boolean readLength;
+  private final int maxDefLevel;
 
-    public VectorizedParquetValuesReader(int maxDefLevel) {
-        this.maxDefLevel = maxDefLevel;
-        this.fixedWidth = false;
-        this.readLength = false;
-    }
+  public VectorizedParquetValuesReader(int maxDefLevel) {
+    this.maxDefLevel = maxDefLevel;
+    this.fixedWidth = false;
+    this.readLength = false;
+  }
 
-    public VectorizedParquetValuesReader(
-            int bitWidth,
-            int maxDefLevel) {
-        this.fixedWidth = true;
-        this.readLength = bitWidth != 0;
-        this.maxDefLevel = maxDefLevel;
-        init(bitWidth);
-    }
+  public VectorizedParquetValuesReader(
+      int bitWidth,
+      int maxDefLevel) {
+    this.fixedWidth = true;
+    this.readLength = bitWidth != 0;
+    this.maxDefLevel = maxDefLevel;
+    init(bitWidth);
+  }
 
-    public VectorizedParquetValuesReader(
-            int bitWidth,
-            boolean readLength,
-            int maxDefLevel) {
-        this.fixedWidth = true;
-        this.readLength = readLength;
-        this.maxDefLevel = maxDefLevel;
-        init(bitWidth);
-    }
+  public VectorizedParquetValuesReader(
+      int bw,
+      boolean rl,
+      int mdl) {
+    this.fixedWidth = true;
+    this.readLength = rl;
+    this.maxDefLevel = mdl;
+    init(bw);
+  }
 
-    @Override
-    public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
-        this.in = in;
-        if (fixedWidth) {
-            // initialize for repetition and definition levels
-            if (readLength) {
-                int length = readIntLittleEndian();
-                this.in = in.sliceStream(length);
-            }
-        } else {
-            // initialize for values
-            if (in.available() > 0) {
-                init(in.read());
-            }
-        }
-        if (bitWidth == 0) {
-            // 0 bit width, treat this as an RLE run of valueCount number of 0's.
-            this.mode = MODE.RLE;
-            this.currentCount = valueCount;
-            this.currentValue = 0;
-        } else {
-            this.currentCount = 0;
-        }
+  @Override
+  public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+    this.inputStream = in;
+    if (fixedWidth) {
+      // initialize for repetition and definition levels
+      if (readLength) {
+        int length = readIntLittleEndian();
+        this.inputStream = in.sliceStream(length);
+      }
+    } else {
+      // initialize for values
+      if (in.available() > 0) {
+        init(in.read());
+      }
     }
-
-
-    /**
-     * Initializes the internal state for decoding ints of `bitWidth`.
-     */
-    private void init(int bitWidth) {
-        Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
-        this.bitWidth = bitWidth;
-        this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
-        this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+    if (bitWidth == 0) {
+      // 0 bit width, treat this as an RLE run of valueCount number of 0's.
+      this.mode = MODE.RLE;
+      this.currentCount = valueCount;
+      this.currentValue = 0;
+    } else {
+      this.currentCount = 0;
     }
+  }
 
-    /**
-     * Reads the next varint encoded int.
-     */
-    private int readUnsignedVarInt() throws IOException {
-        int value = 0;
-        int shift = 0;
-        int b;
-        do {
-            b = in.read();
-            value |= (b & 0x7F) << shift;
-            shift += 7;
-        } while ((b & 0x80) != 0);
-        return value;
-    }
+  /**
+   * Initializes the internal state for decoding ints of `bitWidth`.
+   */
+  private void init(int bw) {
+    Preconditions.checkArgument(bw >= 0 && bw <= 32, "bitWidth must be >= 0 and <= 32");
+    this.bitWidth = bw;
+    this.bytesWidth = BytesUtils.paddedByteCountFromBits(bw);
+    this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bw);
+  }
 
-    /**
-     * Reads the next 4 byte little endian int.
-     */
-    private int readIntLittleEndian() throws IOException {
-        int ch4 = in.read();
-        int ch3 = in.read();
-        int ch2 = in.read();
-        int ch1 = in.read();
-        return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-    }
+  /**
+   * Reads the next varint encoded int.
+   */
+  private int readUnsignedVarInt() throws IOException {
+    int value = 0;
+    int shift = 0;
+    int byteRead;
+    do {
+      byteRead = inputStream.read();
+      value |= (byteRead & 0x7F) << shift;
+      shift += 7;
+    } while ((byteRead & 0x80) != 0);
+    return value;
+  }
 
-    /**
-     * Reads the next byteWidth little endian int.
-     */
-    private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
-        switch (bytesWidth) {
-            case 0:
-                return 0;
-            case 1:
-                return in.read();
-            case 2: {
-                int ch2 = in.read();
-                int ch1 = in.read();
-                return (ch1 << 8) + ch2;
-            }
-            case 3: {
-                int ch3 = in.read();
-                int ch2 = in.read();
-                int ch1 = in.read();
-                return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
-            }
-            case 4: {
-                return readIntLittleEndian();
-            }
-        }
-        throw new RuntimeException("Unreachable");
+  /**
+   * Reads the next 4 byte little endian int.
+   */
+  private int readIntLittleEndian() throws IOException {
+    int ch4 = inputStream.read();
+    int ch3 = inputStream.read();
+    int ch2 = inputStream.read();
+    int ch1 = inputStream.read();
+    return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0);
+  }
+
+  /**
+   * Reads the next byteWidth little endian int.
+   */
+  private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
+    switch (bytesWidth) {
+      case 0:
+        return 0;
+      case 1:
+        return inputStream.read();
+      case 2: {
+        int ch2 = inputStream.read();
+        int ch1 = inputStream.read();
+        return (ch1 << 8) + ch2;
+      }
+      case 3: {
+        int ch3 = inputStream.read();
+        int ch2 = inputStream.read();
+        int ch1 = inputStream.read();
+        return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+      }
+      case 4: {
+        return readIntLittleEndian();
+      }
     }
+    throw new RuntimeException("Unreachable");
+  }
 
-    /**
-     * Reads the next group.
-     */
-    private void readNextGroup() {
-        try {
-            int header = readUnsignedVarInt();
-            this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
-            switch (mode) {
-                case RLE:
-                    this.currentCount = header >>> 1;
-                    this.currentValue = readIntLittleEndianPaddedOnBitWidth();
-                    return;
-                case PACKED:
-                    int numGroups = header >>> 1;
-                    this.currentCount = numGroups * 8;
+  /**
+   * Reads the next group.
+   */
+  private void readNextGroup() {
+    try {
+      int header = readUnsignedVarInt();
+      this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+      switch (mode) {
+        case RLE:
+          this.currentCount = header >>> 1;
+          this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+          return;
+        case PACKED:
+          int numGroups = header >>> 1;
+          this.currentCount = numGroups * 8;
 
-                    if (this.packedValuesBuffer.length < this.currentCount) {
-                        this.packedValuesBuffer = new int[this.currentCount];
-                    }
-                    packedValuesBufferIdx = 0;
-                    int valueIndex = 0;
-                    while (valueIndex < this.currentCount) {
-                        // values are bit packed 8 at a time, so reading bitWidth will always work
-                        ByteBuffer buffer = in.slice(bitWidth);
-                        this.packer.unpack8Values(buffer, buffer.position(), this.packedValuesBuffer, valueIndex);
-                        valueIndex += 8;
-                    }
-                    return;
-                default:
-                    throw new ParquetDecodingException("not a valid mode " + this.mode);
-            }
-        } catch (IOException e) {
-            throw new ParquetDecodingException("Failed to read from input stream", e);
-        }
+          if (this.packedValuesBuffer.length < this.currentCount) {
+            this.packedValuesBuffer = new int[this.currentCount];
+          }
+          packedValuesBufferIdx = 0;
+          int valueIndex = 0;
+          while (valueIndex < this.currentCount) {
+            // values are bit packed 8 at a time, so reading bitWidth will always work
+            ByteBuffer buffer = inputStream.slice(bitWidth);
+            this.packer.unpack8Values(buffer, buffer.position(), this.packedValuesBuffer, valueIndex);
+            valueIndex += 8;
+          }
+          return;
+        default:
+          throw new ParquetDecodingException("not a valid mode " + this.mode);
+      }
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read from input stream", e);
     }
+  }
 
-    @Override
-    public boolean readBoolean() {
-        return this.readInteger() != 0;
-    }
+  @Override
+  public boolean readBoolean() {
+    return this.readInteger() != 0;
+  }
 
-    @Override
-    public void skip() {
-        this.readInteger();
-    }
+  @Override
+  public void skip() {
+    this.readInteger();
+  }
 
-    @Override
-    public int readValueDictionaryId() {
-        return readInteger();
-    }
+  @Override
+  public int readValueDictionaryId() {
+    return readInteger();
+  }
 
-    @Override
-    public int readInteger() {
-        if (this.currentCount == 0) {
-            this.readNextGroup();
-        }
+  @Override
+  public int readInteger() {
+    if (this.currentCount == 0) {
+      this.readNextGroup();
+    }
 
-        this.currentCount--;
-        switch (mode) {
-            case RLE:
-                return this.currentValue;
-            case PACKED:
-                return this.packedValuesBuffer[packedValuesBufferIdx++];
-        }
-        throw new RuntimeException("Unreachable");
+    this.currentCount--;
+    switch (mode) {
+      case RLE:
+        return this.currentValue;
+      case PACKED:
+        return this.packedValuesBuffer[packedValuesBufferIdx++];
     }
+    throw new RuntimeException("Unreachable");
+  }
 
-    public void readBatchOfDictionaryIds(
-            final IntVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
+  public void readBatchOfDictionaryIds(
+      final IntVector vector,
+      final int numValsInVector,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader dictionaryEncodedValuesReader) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int numValues = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, numValues);
+          } else {
+            setNulls(nullabilityHolder, idx, numValues, vector.getValidityBuffer());
+          }
+          idx += numValues;
+          break;
+        case PACKED:
+          for (int i = 0; i < numValues; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              vector.set(idx, dictionaryEncodedValuesReader.readInteger());
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
             }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, n);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            vector.set(idx, dictionaryEncodedValuesReader.readInteger());
-                        } else {
-                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
-                        }
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= numValues;
+      currentCount -= numValues;
     }
+  }
 
-    // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
-    // check definition level.
-    private void readDictionaryIdsInternal(final IntVector c, final int numValsInVector, final int numValuesToRead) {
-        int left = numValuesToRead;
-        int idx = numValsInVector;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        c.set(idx, currentValue);
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        c.set(idx, packedValuesBuffer[packedValuesBufferIdx]);
-                        packedValuesBufferIdx++;
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
+  // check definition level.
+  private void readDictionaryIdsInternal(
+      final IntVector intVector,
+      final int numValsInVector,
+      final int numValuesToRead) {
+    int left = numValuesToRead;
+    int idx = numValsInVector;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int numValues = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < numValues; i++) {
+            intVector.set(idx, currentValue);
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < numValues; i++) {
+            intVector.set(idx, packedValuesBuffer[packedValuesBufferIdx]);
+            packedValuesBufferIdx++;
+            idx++;
+          }
+          break;
+      }
+      left -= numValues;
+      currentCount -= numValues;
     }
+  }
 
-    public void readBatchOfIntegers(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    setNextNValuesInVector(
-                            typeWidth,
-                            maxDefLevel,
-                            nullabilityHolder,
-                            valuesReader,
-                            bufferIdx,
-                            vector,
-                            n);
-                    bufferIdx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; ++i) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+  public void readBatchOfLongs(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int numValues = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          setNextNValuesInVector(
+              typeWidth,
+              nullabilityHolder,
+              valuesReader,
+              bufferIdx,
+              vector,
+              numValues);
+          bufferIdx += numValues;
+          break;
+        case PACKED:
+          for (int i = 0; i < numValues; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setValue(
+                  typeWidth,
+                  valuesReader,
+                  bufferIdx,
+                  vector.getValidityBuffer(),
+                  vector.getDataBuffer());
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= numValues;
+      currentCount -= numValues;
     }
+  }
 
-    public void readBatchOfDictionaryEncodedIntegers(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
-                        } else {
-                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
-                        }
-                        idx++;
-                    }
-                    break;
+  public void readBatchOfDictionaryEncodedLongs(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int typeWidth,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader valuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int numValues = Math.min(left, this.currentCount);
+      ArrowBuf validityBuffer = vector.getValidityBuffer();
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, numValues, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, numValues, validityBuffer);
+          }
+          idx += numValues;
+          break;
+        case PACKED:
+          for (int i = 0; i < numValues; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
+            } else {
+              setNull(nullabilityHolder, idx, validityBuffer);
             }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= numValues;
+      currentCount -= numValues;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedIntegersInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            ArrowBuf dataBuffer = vector.getDataBuffer();
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        dataBuffer.setInt(idx, dict.decodeToInt(currentValue));
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        dataBuffer.setInt(idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedLongsInternal(
+      FieldVector vector,
+      int typeWidth,
+      int index,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    int idx = index;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int numValues = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < numValues; i++) {
+            vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < numValues; i++) {
+            vector.getDataBuffer()
+                .setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+            idx++;
+          }
+          break;
+      }
+      left -= numValues;
+      currentCount -= numValues;
     }
+  }
 
-    public void readBatchOfLongs(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    setNextNValuesInVector(
-                            typeWidth,
-                            maxDefLevel,
-                            nullabilityHolder,
-                            valuesReader,
-                            bufferIdx,
-                            vector,
-                            n);
-                    bufferIdx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; ++i) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+  public void readBatchOfIntegers(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          setNextNValuesInVector(
+              typeWidth,
+              nullabilityHolder,
+              valuesReader,
+              bufferIdx,
+              vector,
+              n);
+          bufferIdx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfDictionaryEncodedLongs(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            ArrowBuf validityBuffer = vector.getValidityBuffer();
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, validityBuffer);
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
-                        } else {
-                            setNull(nullabilityHolder, idx, validityBuffer);
-                        }
-                        idx++;
-                    }
-                    break;
+  public void readBatchOfDictionaryEncodedIntegers(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int typeWidth,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader valuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+          }
+          idx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedLongsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedIntegersInternal(
+      FieldVector vector,
+      int typeWidth,
+      int idx,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      ArrowBuf dataBuffer = vector.getDataBuffer();
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < n; i++) {
+            dataBuffer.setInt(idx, dict.decodeToInt(currentValue));
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            dataBuffer.setInt(idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfFloats(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    setNextNValuesInVector(
-                            typeWidth,
-                            maxDefLevel,
-                            nullabilityHolder,
-                            valuesReader,
-                            bufferIdx,
-                            vector,
-                            n);
-                    bufferIdx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; ++i) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+  public void readBatchOfFloats(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          setNextNValuesInVector(
+              typeWidth,
+              nullabilityHolder,
+              valuesReader,
+              bufferIdx,
+              vector,
+              n);
+          bufferIdx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void setValue(int typeWidth, BytesReader valuesReader, int bufferIdx, ArrowBuf validityBuffer, ArrowBuf dataBuffer) {
-        dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
-        BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx);
-        bufferIdx++;
-    }
+  private void setValue(
+      int typeWidth,
+      ValuesAsBytesReader valuesReader,
+      int bufferIdx,
+      ArrowBuf validityBuffer,
+      ArrowBuf dataBuffer) {
+    dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
+    BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx);
+    bufferIdx++;
+  }
 
-    public void readBatchOfDictionaryEncodedFloats(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            ArrowBuf validityBuffer = vector.getValidityBuffer();
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, validityBuffer);
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
-                        } else {
-                            setNull(nullabilityHolder, idx, validityBuffer);
-                        }
-                        idx++;
-                    }
-                    break;
+  public void readBatchOfDictionaryEncodedFloats(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int typeWidth,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader valuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      ArrowBuf validityBuffer = vector.getValidityBuffer();
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, n, validityBuffer);
+          }
+          idx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
+            } else {
+              setNull(nullabilityHolder, idx, validityBuffer);
             }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedFloatsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedFloatsInternal(
+      FieldVector vector,
+      int typeWidth,
+      int idx,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < n; i++) {
+            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfDoubles(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-            BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    setNextNValuesInVector(
-                            typeWidth,
-                            maxDefLevel,
-                            nullabilityHolder,
-                            valuesReader,
-                            bufferIdx,
-                            vector,
-                            n);
-                    bufferIdx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; ++i) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+  public void readBatchOfDoubles(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          setNextNValuesInVector(
+              typeWidth,
+              nullabilityHolder,
+              valuesReader,
+              bufferIdx,
+              vector,
+              n);
+          bufferIdx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfDictionaryEncodedDoubles(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
-                        } else {
-                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
-                        }
-                        idx++;
-                    }
-                    break;
+  public void readBatchOfDictionaryEncodedDoubles(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int typeWidth,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader valuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+          }
+          idx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedDoublesInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedDoublesInternal(
+      FieldVector vector,
+      int typeWidth,
+      int idx,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < n; i++) {
+            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfFixedWidthBinary(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-            BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
+  public void readBatchOfFixedWidthBinary(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+              setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+              bufferIdx++;
             }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        for (int i = 0; i < n; i++) {
-                            setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
-                            bufferIdx++;
-                        }
-                    } else {
-                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
-                        bufferIdx += n;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+            bufferIdx += n;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfDictionaryEncodedFixedWidthBinary(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
-                        } else {
-                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
-                        }
-                        idx++;
-                    }
-                    break;
+  public void readBatchOfDictionaryEncodedFixedWidthBinary(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int typeWidth,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader valuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+          }
+          idx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              vector.getDataBuffer()
+                  .setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
-                        BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
-                        BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(
+      FieldVector vector,
+      int typeWidth,
+      int idx,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < n; i++) {
+            vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
+            BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            vector.getDataBuffer()
+                .setBytes(
+                    idx * typeWidth,
+                    dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+            BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfFixedLengthDecimals(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-            BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
+  public void readBatchOfFixedLengthDecimals(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+              bufferIdx++;
             }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        for (int i = 0; i < n; i++) {
-                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-                            valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-                            ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
-                            bufferIdx++;
-                        }
-                    } else {
-                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
-                        bufferIdx += n;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; ++i) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-                            valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-                            ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+            bufferIdx += n;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfDictionaryEncodedFixedLengthDecimals(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
-                            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-                            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-                            ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
-                        } else {
-                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
-                        }
-                        idx++;
-                    }
-                    break;
+  public void readBatchOfDictionaryEncodedFixedLengthDecimals(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int typeWidth,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader valuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+          }
+          idx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
+              byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+              System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
-                        byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
-                        byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-                        System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-                        ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
-                        byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
-                        byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-                        System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
-                        ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(
+      FieldVector vector,
+      int typeWidth,
+      int idx,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < n; i++) {
+            // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
+            byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
+            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+            ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
+            byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
+            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+            ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    /**
-     * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
-     * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
-     * vector. It appropriately sets the validity buffer in the Arrow vector.
-     */
-    public void readBatchVarWidth(
-            final FieldVector vector,
-            final int numValsInVector,
-            final int batchSize,
-            NullabilityHolder nullabilityHolder,
-            BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
+  /**
+   * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP) This
+   * method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow vector. It
+   * appropriately sets the validity buffer in the Arrow vector.
+   */
+  public void readBatchVarWidth(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+              setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+              bufferIdx++;
             }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        for (int i = 0; i < n; i++) {
-                            setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
-                            bufferIdx++;
-                        }
-                    } else {
-                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
-                        bufferIdx += n;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+            bufferIdx += n;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void setVarWidthBinaryValue(FieldVector vector, BytesReader valuesReader, int bufferIdx) {
-        int len = valuesReader.readInteger();
-        ByteBuffer buffer = valuesReader.getBuffer(len);
-        // Calling setValueLengthSafe takes care of allocating a larger buffer if
-        // running out of space.
-        ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
-        // It is possible that the data buffer was reallocated. So it is important to
-        // not cache the data buffer reference but instead use vector.getDataBuffer().
-        vector.getDataBuffer().writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
-        // Similarly, we need to get the latest reference to the validity buffer as well
-        // since reallocation changes reference of the validity buffers as well.
-        BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-    }
+  private void setVarWidthBinaryValue(FieldVector vector, ValuesAsBytesReader valuesReader, int bufferIdx) {
+    int len = valuesReader.readInteger();
+    ByteBuffer buffer = valuesReader.getBuffer(len);
+    // Calling setValueLengthSafe takes care of allocating a larger buffer if
+    // running out of space.
+    ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
+    // It is possible that the data buffer was reallocated. So it is important to
+    // not cache the data buffer reference but instead use vector.getDataBuffer().
+    vector.getDataBuffer().writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
+    // Similarly, we need to get the latest reference to the validity buffer as well
+    // since reallocation changes reference of the validity buffers as well.
+    BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+  }
 
-    public void readBatchOfDictionaryEncodedVarWidth(
-            final FieldVector vector, final int numValsInVector,
-            final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
-            }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
-                        } else {
-                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
-                        }
-                        idx++;
-                    }
-                    break;
+  public void readBatchOfDictionaryEncodedVarWidth(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader dictionaryEncodedValuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+          }
+          idx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              ((BaseVariableWidthVector) vector).setSafe(
+                  idx,
+                  dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(FieldVector vector, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(
+      FieldVector vector,
+      int idx,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < n; i++) {
+            ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            ((BaseVariableWidthVector) vector).setSafe(
+                idx,
+                dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfIntLongBackedDecimals(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
-            BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
+  public void readBatchOfIntLongBackedDecimals(
+      final FieldVector vector, final int numValsInVector,
+      final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+      ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+              vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+              bufferIdx++;
             }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        for (int i = 0; i < n; i++) {
-                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-                            valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-                            vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
-                            BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
-                            bufferIdx++;
-                        }
-                    } else {
-                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
-                        bufferIdx += n;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; ++i) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
-                            valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-                            vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
-                            bufferIdx++;
-                            //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-                        } else {
-                            //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-                            nullabilityHolder.setNull(bufferIdx);
-                            bufferIdx++;
-                        }
-                    }
-                    break;
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+            bufferIdx += n;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+              vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+              bufferIdx++;
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+            } else {
+              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              nullabilityHolder.setNull(bufferIdx);
+              bufferIdx++;
             }
-            left -= n;
-            currentCount -= n;
-        }
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
-            final FieldVector vector, final int numValsInVector,
-            final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
-        int idx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
+  public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
+      final FieldVector vector,
+      final int numValsInVector,
+      final int typeWidth,
+      final int batchSize,
+      NullabilityHolder nullabilityHolder,
+      VectorizedParquetValuesReader valuesReader,
+      Dictionary dict) {
+    int idx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
+          } else {
+            setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+          }
+          idx += n;
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              ((DecimalVector) vector).set(
+                  idx,
+                  (typeWidth == Integer.BYTES
+                      ? dict.decodeToInt(valuesReader.readInteger())
+                      : dict.decodeToLong(valuesReader.readInteger())));
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
             }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
-                    } else {
-                        setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
-                    }
-                    idx += n;
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(valuesReader.readInteger()) : dict.decodeToLong(valuesReader.readInteger())));
-                        } else {
-                            setNull(nullabilityHolder, idx, vector.getValidityBuffer());
-                        }
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(FieldVector vector, final int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
-        int left = numValuesToRead;
-        while (left > 0) {
-            if (this.currentCount == 0) this.readNextGroup();
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    for (int i = 0; i < n; i++) {
-                        ((DecimalVector) vector).set(idx, typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
-                        idx++;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; i++) {
-                        ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++])));
-                        idx++;
-                    }
-                    break;
-            }
-            left -= n;
-            currentCount -= n;
-        }
+  private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(
+      FieldVector vector,
+      final int typeWidth,
+      int idx,
+      int numValuesToRead,
+      Dictionary dict) {
+    int left = numValuesToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < n; i++) {
+            ((DecimalVector) vector).set(
+                idx,
+                typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; i++) {
+            ((DecimalVector) vector).set(
+                idx,
+                (typeWidth == Integer.BYTES
+                    ? dict.decodeToInt(currentValue)
+                    : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++])));
+            idx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    public void readBatchOfBooleans(
-            final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
-        int bufferIdx = numValsInVector;
-        int left = batchSize;
-        while (left > 0) {
-            if (this.currentCount == 0) {
-                this.readNextGroup();
+  public void readBatchOfBooleans(
+      final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder
+      , ValuesAsBytesReader valuesReader) {
+    int bufferIdx = numValsInVector;
+    int left = batchSize;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < n; i++) {
+              ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+              bufferIdx++;
             }
-            int n = Math.min(left, this.currentCount);
-            switch (mode) {
-                case RLE:
-                    if (currentValue == maxDefLevel) {
-                        for (int i = 0; i < n; i++) {
-                            ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
-                            bufferIdx++;
-                        }
-                    } else {
-                        setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
-                        bufferIdx += n;
-                    }
-                    break;
-                case PACKED:
-                    for (int i = 0; i < n; ++i) {
-                        if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-                            ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
-                        } else {
-                            setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
-                        }
-                        bufferIdx++;
-                    }
-                    break;
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+            bufferIdx += n;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
-            left -= n;
-            currentCount -= n;
-        }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= n;
+      currentCount -= n;
     }
+  }
 
-    private void setBinaryInVector(VarBinaryVector vector, int typeWidth, BytesReader valuesReader, int bufferIdx) {
-        byte[] byteArray = new byte[typeWidth];
-        valuesReader.getBuffer(typeWidth).get(byteArray);
-        vector.setSafe(bufferIdx, byteArray);
-    }
+  private void setBinaryInVector(
+      VarBinaryVector vector,
+      int typeWidth,
+      ValuesAsBytesReader valuesReader,
+      int bufferIdx) {
+    byte[] byteArray = new byte[typeWidth];
+    valuesReader.getBuffer(typeWidth).get(byteArray);
+    vector.setSafe(bufferIdx, byteArray);
+  }
 
-    private void setNextNValuesInVector(
-            int typeWidth, int maxDefLevel, NullabilityHolder nullabilityHolder,
-            BytesReader valuesReader, int bufferIdx, FieldVector vector, int n) {
-        ArrowBuf validityBuffer = vector.getValidityBuffer();
-        int validityBufferIdx = bufferIdx;
-        if (currentValue == maxDefLevel) {
-            for (int i = 0; i < n; i++) {
-                BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-                validityBufferIdx++;
-            }
-            ByteBuffer buffer = valuesReader.getBuffer(n * typeWidth);
-            vector.getDataBuffer().setBytes(bufferIdx * typeWidth, buffer);
-        } else {
-            setNulls(nullabilityHolder, bufferIdx, n, validityBuffer);
-        }
+  private void setNextNValuesInVector(
+      int typeWidth, NullabilityHolder nullabilityHolder,
+      ValuesAsBytesReader valuesReader, int bufferIdx, FieldVector vector, int numValues) {
+    ArrowBuf validityBuffer = vector.getValidityBuffer();
+    int validityBufferIdx = bufferIdx;
+    if (currentValue == maxDefLevel) {
+      for (int i = 0; i < numValues; i++) {
+        BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+        validityBufferIdx++;
+      }
+      ByteBuffer buffer = valuesReader.getBuffer(numValues * typeWidth);
+      vector.getDataBuffer().setBytes(bufferIdx * typeWidth, buffer);
+    } else {
+      setNulls(nullabilityHolder, bufferIdx, numValues, validityBuffer);
     }
+  }
 
-    private void setNull(NullabilityHolder nullabilityHolder, int bufferIdx, ArrowBuf validityBuffer) {
-        nullabilityHolder.setNull(bufferIdx);
-        BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
-    }
+  private void setNull(NullabilityHolder nullabilityHolder, int bufferIdx, ArrowBuf validityBuffer) {
+    nullabilityHolder.setNull(bufferIdx);
+    BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+  }
 
-    private void setNulls(NullabilityHolder nullabilityHolder, int bufferIdx, int n, ArrowBuf validityBuffer) {
-        for (int i = 0; i < n; i++) {
-            nullabilityHolder.setNull(bufferIdx);
-            BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
-            bufferIdx++;
-        }
+  private void setNulls(NullabilityHolder nullabilityHolder, int idx, int numValues, ArrowBuf validityBuffer) {
+    int bufferIdx = idx;
+    for (int i = 0; i < numValues; i++) {
+      nullabilityHolder.setNull(bufferIdx);
+      BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+      bufferIdx++;
     }
-
-}
+  }
+}
\ No newline at end of file
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 35bb5c7..4962ee8 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -19,6 +19,10 @@
 
 package org.apache.iceberg.spark.data;
 
+import static org.apache.iceberg.spark.SparkSchemaUtil.convert;
+import static scala.collection.JavaConverters.mapAsJavaMapConverter;
+import static scala.collection.JavaConverters.seqAsJavaListConverter;
+
 import com.google.common.collect.Lists;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -33,12 +37,11 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
+import org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.storage.serde2.io.DateWritable;
@@ -62,10 +65,6 @@ import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Assert;
 import scala.collection.Seq;
 
-import static org.apache.iceberg.spark.SparkSchemaUtil.convert;
-import static scala.collection.JavaConverters.mapAsJavaMapConverter;
-import static scala.collection.JavaConverters.seqAsJavaListConverter;
-
 @SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
 public class TestHelpers {
 
diff --git a/versions.props b/versions.props
index e418732..40c0d07 100644
--- a/versions.props
+++ b/versions.props
@@ -10,6 +10,7 @@ org.apache.spark:spark-hive_2.11 = 2.4.0
 org.apache.pig:pig = 0.14.0
 com.fasterxml.jackson.*:* = 2.7.9
 com.github.ben-manes.caffeine:caffeine = 2.7.0
+org.apache.arrow:arrow-vector = 0.14.1
 
 # test deps
 junit:junit = 4.12