You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/30 17:50:08 UTC

[incubator-iceberg] branch master updated: Vectorization: Parquet additions to support batch reads (#710)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c88ef1  Vectorization: Parquet additions to support batch reads (#710)
3c88ef1 is described below

commit 3c88ef1d89800dde9edb61e59b851e64ffef0f33
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Mon Dec 30 09:50:00 2019 -0800

    Vectorization: Parquet additions to support batch reads (#710)
    
    Co-authored-by: gautamkowshik@gmail.com
    Co-authored-by: anjalinorwood@gmail.com
---
 .../java/org/apache/iceberg/parquet/Parquet.java   |  29 ++-
 .../parquet/ParquetDictionaryRowGroupFilter.java   |  32 +---
 .../org/apache/iceberg/parquet/ParquetReader.java  | 111 +----------
 .../org/apache/iceberg/parquet/ParquetUtil.java    |  32 +++-
 .../java/org/apache/iceberg/parquet/ReadConf.java  | 203 +++++++++++++++++++++
 .../iceberg/parquet/ValuesAsBytesReader.java       |  87 +++++++++
 .../iceberg/parquet/VectorizedParquetReader.java   | 150 +++++++++++++++
 .../apache/iceberg/parquet/VectorizedReader.java   |  51 ++++++
 8 files changed, 549 insertions(+), 146 deletions(-)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index f03c5bc..689ddb1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -304,12 +304,14 @@ public class Parquet {
     private Schema schema = null;
     private Expression filter = null;
     private ReadSupport<?> readSupport = null;
+    private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
     private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
     private boolean filterRecords = true;
     private boolean caseSensitive = true;
     private Map<String, String> properties = Maps.newHashMap();
     private boolean callInit = false;
     private boolean reuseContainers = false;
+    private int maxRecordsPerBatch = 10000;
 
     private ReadBuilder(InputFile file) {
       this.file = file;
@@ -358,10 +360,19 @@ public class Parquet {
     }
 
     public ReadBuilder createReaderFunc(Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
+      Preconditions.checkArgument(this.batchedReaderFunc == null,
+          "Reader function cannot be set since the batched version is already set");
       this.readerFunc = newReaderFunction;
       return this;
     }
 
+    public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
+      Preconditions.checkArgument(this.readerFunc == null,
+          "Batched reader function cannot be set since the non-batched version is already set");
+      this.batchedReaderFunc = func;
+      return this;
+    }
+
     public ReadBuilder set(String key, String value) {
       properties.put(key, value);
       return this;
@@ -377,9 +388,14 @@ public class Parquet {
       return this;
     }
 
-    @SuppressWarnings("unchecked")
+    public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
+      this.maxRecordsPerBatch = numRowsPerBatch;
+      return this;
+    }
+
+    @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
     public <D> CloseableIterable<D> build() {
-      if (readerFunc != null) {
+      if (readerFunc != null || batchedReaderFunc != null) {
         ParquetReadOptions.Builder optionsBuilder;
         if (file instanceof HadoopInputFile) {
           // remove read properties already set that may conflict with this read
@@ -402,8 +418,13 @@ public class Parquet {
 
         ParquetReadOptions options = optionsBuilder.build();
 
-        return new org.apache.iceberg.parquet.ParquetReader<>(
-            file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
+        if (batchedReaderFunc != null) {
+          return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, filter, reuseContainers,
+              caseSensitive, maxRecordsPerBatch);
+        } else {
+          return new org.apache.iceberg.parquet.ParquetReader<>(
+              file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
+        }
       }
 
       ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 3b0198f..6dc1f6d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -40,8 +39,6 @@ import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -114,7 +111,7 @@ public class ParquetDictionaryRowGroupFilter {
         PrimitiveType colType = fileSchema.getType(meta.getPath().toArray()).asPrimitiveType();
         if (colType.getId() != null) {
           int id = colType.getId().intValue();
-          isFallback.put(id, hasNonDictionaryPages(meta));
+          isFallback.put(id, ParquetUtil.hasNonDictionaryPages(meta));
           mayContainNulls.put(id, mayContainNull(meta));
         }
       }
@@ -385,31 +382,4 @@ public class ParquetDictionaryRowGroupFilter {
     return meta.getStatistics() == null || meta.getStatistics().getNumNulls() != 0;
   }
 
-  @SuppressWarnings("deprecation")
-  private static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
-    EncodingStats stats = meta.getEncodingStats();
-    if (stats != null) {
-      return stats.hasNonDictionaryEncodedPages();
-    }
-
-    // without EncodingStats, fall back to testing the encoding list
-    Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
-    if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
-      // if remove returned true, PLAIN_DICTIONARY was present, which means at
-      // least one page was dictionary encoded and 1.0 encodings are used
-
-      // RLE and BIT_PACKED are only used for repetition or definition levels
-      encodings.remove(Encoding.RLE);
-      encodings.remove(Encoding.BIT_PACKED);
-
-      // when empty, no encodings other than dictionary or rep/def levels
-      return !encodings.isEmpty();
-    } else {
-      // if PLAIN_DICTIONARY wasn't present, then either the column is not
-      // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
-      // for 2.0, this cannot determine whether a page fell back without
-      // page encoding stats
-      return true;
-    }
-  }
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 80f092d..9ba36b3 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.parquet;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
 import java.util.function.Function;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -34,7 +33,6 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.schema.MessageType;
 
 public class ParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
@@ -59,122 +57,15 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
     this.caseSensitive = caseSensitive;
   }
 
-  private static class ReadConf<T> {
-    private final ParquetFileReader reader;
-    private final InputFile file;
-    private final ParquetReadOptions options;
-    private final MessageType projection;
-    private final ParquetValueReader<T> model;
-    private final List<BlockMetaData> rowGroups;
-    private final boolean[] shouldSkip;
-    private final long totalValues;
-    private final boolean reuseContainers;
-
-    @SuppressWarnings("unchecked")
-    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
-             Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
-             boolean caseSensitive) {
-      this.file = file;
-      this.options = options;
-      this.reader = newReader(file, options);
-
-      MessageType fileSchema = reader.getFileMetaData().getSchema();
-
-      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
-      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
-
-      this.projection = hasIds ?
-          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
-          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
-      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
-      this.rowGroups = reader.getRowGroups();
-      this.shouldSkip = new boolean[rowGroups.size()];
-
-      ParquetMetricsRowGroupFilter statsFilter = null;
-      ParquetDictionaryRowGroupFilter dictFilter = null;
-      if (filter != null) {
-        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
-        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
-      }
-
-      long computedTotalValues = 0L;
-      for (int i = 0; i < shouldSkip.length; i += 1) {
-        BlockMetaData rowGroup = rowGroups.get(i);
-        boolean shouldRead = filter == null || (
-            statsFilter.shouldRead(typeWithIds, rowGroup) &&
-            dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
-        this.shouldSkip[i] = !shouldRead;
-        if (shouldRead) {
-          computedTotalValues += rowGroup.getRowCount();
-        }
-      }
-
-      this.totalValues = computedTotalValues;
-      this.reuseContainers = reuseContainers;
-    }
-
-    ReadConf(ReadConf<T> toCopy) {
-      this.reader = null;
-      this.file = toCopy.file;
-      this.options = toCopy.options;
-      this.projection = toCopy.projection;
-      this.model = toCopy.model;
-      this.rowGroups = toCopy.rowGroups;
-      this.shouldSkip = toCopy.shouldSkip;
-      this.totalValues = toCopy.totalValues;
-      this.reuseContainers = toCopy.reuseContainers;
-    }
-
-    ParquetFileReader reader() {
-      if (reader != null) {
-        reader.setRequestedSchema(projection);
-        return reader;
-      }
-
-      ParquetFileReader newReader = newReader(file, options);
-      newReader.setRequestedSchema(projection);
-      return newReader;
-    }
-
-    ParquetValueReader<T> model() {
-      return model;
-    }
-
-    boolean[] shouldSkip() {
-      return shouldSkip;
-    }
-
-    long totalValues() {
-      return totalValues;
-    }
-
-    boolean reuseContainers() {
-      return reuseContainers;
-    }
-
-    ReadConf<T> copy() {
-      return new ReadConf<>(this);
-    }
-
-    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
-      try {
-        return ParquetFileReader.open(ParquetIO.file(file), options);
-      } catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
-      }
-    }
-  }
-
   private ReadConf<T> conf = null;
 
   private ReadConf<T> init() {
     if (conf == null) {
       ReadConf<T> readConf = new ReadConf<>(
-          input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive);
+          input, options, expectedSchema, filter, readerFunc, null, reuseContainers, caseSensitive, null);
       this.conf = readConf.copy();
       return readConf;
     }
-
     return conf;
   }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 75cc8f6..674243c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +43,8 @@ import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.BinaryUtil;
 import org.apache.iceberg.util.UnicodeUtil;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -127,7 +130,6 @@ public class ParquetUtil {
         toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
   }
 
-
   /**
    * @return a list of offsets in ascending order determined by the starting position
    * of the row groups
@@ -225,4 +227,32 @@ public class ParquetUtil {
     }
     return bufferMap;
   }
+
+  @SuppressWarnings("deprecation")
+  public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
+    EncodingStats stats = meta.getEncodingStats();
+    if (stats != null) {
+      return stats.hasNonDictionaryEncodedPages();
+    }
+
+    // without EncodingStats, fall back to testing the encoding list
+    Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
+    if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
+      // if remove returned true, PLAIN_DICTIONARY was present, which means at
+      // least one page was dictionary encoded and 1.0 encodings are used
+
+      // RLE and BIT_PACKED are only used for repetition or definition levels
+      encodings.remove(Encoding.RLE);
+      encodings.remove(Encoding.BIT_PACKED);
+
+      // when empty, no encodings other than dictionary or rep/def levels
+      return !encodings.isEmpty();
+    } else {
+      // if PLAIN_DICTIONARY wasn't present, then either the column is not
+      // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
+      // for 2.0, this cannot determine whether a page fell back without
+      // page encoding stats
+      return true;
+    }
+  }
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
new file mode 100644
index 0000000..51a19d4
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+  private final ParquetFileReader reader;
+  private final InputFile file;
+  private final ParquetReadOptions options;
+  private final MessageType projection;
+  @Nullable
+  private final ParquetValueReader<T> model;
+  @Nullable
+  private final VectorizedReader<T> vectorizedModel;
+  private final List<BlockMetaData> rowGroups;
+  private final boolean[] shouldSkip;
+  private final long totalValues;
+  private final boolean reuseContainers;
+  @Nullable
+  private final Integer batchSize;
+
+  // List of column chunk metadata for each row group
+  private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
+
+  @SuppressWarnings("unchecked")
+  ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           boolean caseSensitive, Integer bSize) {
+    this.file = file;
+    this.options = options;
+    this.reader = newReader(file, options);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+    MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+    this.projection = hasIds ?
+        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    this.rowGroups = reader.getRowGroups();
+    this.shouldSkip = new boolean[rowGroups.size()];
+
+    ParquetMetricsRowGroupFilter statsFilter = null;
+    ParquetDictionaryRowGroupFilter dictFilter = null;
+    if (filter != null) {
+      statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+      dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+    }
+
+    long computedTotalValues = 0L;
+    for (int i = 0; i < shouldSkip.length; i += 1) {
+      BlockMetaData rowGroup = rowGroups.get(i);
+      boolean shouldRead = filter == null || (
+          statsFilter.shouldRead(typeWithIds, rowGroup) &&
+              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+      this.shouldSkip[i] = !shouldRead;
+      if (shouldRead) {
+        computedTotalValues += rowGroup.getRowCount();
+      }
+    }
+
+    this.totalValues = computedTotalValues;
+    if (readerFunc != null) {
+      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
+      this.vectorizedModel = null;
+      this.columnChunkMetaDataForRowGroups = null;
+    } else {
+      this.model = null;
+      this.vectorizedModel = (VectorizedReader<T>) batchedReaderFunc.apply(typeWithIds);
+      this.columnChunkMetaDataForRowGroups = getColumnChunkMetadataForRowGroups();
+    }
+
+    this.reuseContainers = reuseContainers;
+    this.batchSize = bSize;
+  }
+
+  private ReadConf(ReadConf<T> toCopy) {
+    this.reader = null;
+    this.file = toCopy.file;
+    this.options = toCopy.options;
+    this.projection = toCopy.projection;
+    this.model = toCopy.model;
+    this.rowGroups = toCopy.rowGroups;
+    this.shouldSkip = toCopy.shouldSkip;
+    this.totalValues = toCopy.totalValues;
+    this.reuseContainers = toCopy.reuseContainers;
+    this.batchSize = toCopy.batchSize;
+    this.vectorizedModel = toCopy.vectorizedModel;
+    this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
+  }
+
+  ParquetFileReader reader() {
+    if (reader != null) {
+      reader.setRequestedSchema(projection);
+      return reader;
+    }
+
+    ParquetFileReader newReader = newReader(file, options);
+    newReader.setRequestedSchema(projection);
+    return newReader;
+  }
+
+  ParquetValueReader<T> model() {
+    return model;
+  }
+
+  VectorizedReader<T> vectorizedModel() {
+    return vectorizedModel;
+  }
+
+  boolean[] shouldSkip() {
+    return shouldSkip;
+  }
+
+  long totalValues() {
+    return totalValues;
+  }
+
+  boolean reuseContainers() {
+    return reuseContainers;
+  }
+
+  Integer batchSize() {
+    return batchSize;
+  }
+
+  List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() {
+    return columnChunkMetaDataForRowGroups;
+  }
+
+  ReadConf<T> copy() {
+    return new ReadConf<>(this);
+  }
+
+  private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+    try {
+      return ParquetFileReader.open(ParquetIO.file(file), options);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+    }
+  }
+
+  private List<Map<ColumnPath, ColumnChunkMetaData>> getColumnChunkMetadataForRowGroups() {
+    Set<ColumnPath> projectedColumns = projection.getColumns().stream()
+        .map(columnDescriptor -> ColumnPath.get(columnDescriptor.getPath())).collect(Collectors.toSet());
+    ImmutableList.Builder<Map<ColumnPath, ColumnChunkMetaData>> listBuilder = ImmutableList.builder();
+    for (int i = 0; i < rowGroups.size(); i++) {
+      if (!shouldSkip[i]) {
+        BlockMetaData blockMetaData = rowGroups.get(i);
+        ImmutableMap.Builder<ColumnPath, ColumnChunkMetaData> mapBuilder = ImmutableMap.builder();
+        blockMetaData.getColumns().stream()
+            .filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath()))
+            .forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData));
+        listBuilder.add(mapBuilder.build());
+      } else {
+        listBuilder.add(ImmutableMap.of());
+      }
+    }
+    return listBuilder.build();
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
new file mode 100644
index 0000000..6100979
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Implements a {@link ValuesReader} specifically to read given number of bytes from the underlying {@link
+ * ByteBufferInputStream}.
+ */
+public class ValuesAsBytesReader extends ValuesReader {
+  private ByteBufferInputStream valuesInputStream = null;
+  // Only used for booleans.
+  private int bitOffset;
+  private byte currentByte = 0;
+
+  public ValuesAsBytesReader() {
+  }
+
+  @Override
+  public void initFromPage(int valueCount, ByteBufferInputStream in) {
+    this.valuesInputStream = in;
+  }
+
+  @Override
+  public void skip() {
+    throw new UnsupportedOperationException();
+  }
+
+  public ByteBuffer getBuffer(int length) {
+    try {
+      return valuesInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
+    }
+  }
+
+  @Override
+  public final int readInteger() {
+    return getBuffer(4).getInt();
+  }
+
+  @Override
+  public final boolean readBoolean() {
+    if (bitOffset == 0) {
+      currentByte = getByte();
+    }
+
+    boolean value = (currentByte & (1 << bitOffset)) != 0;
+    bitOffset += 1;
+    if (bitOffset == 8) {
+      bitOffset = 0;
+    }
+    return value;
+  }
+
+  private byte getByte() {
+    try {
+      return (byte) valuesInputStream.read();
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read a byte", e);
+    }
+  }
+}
+
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
new file mode 100644
index 0000000..585f995
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<?>> batchReaderFunc;
+  private final Expression filter;
+  private boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<?>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, null, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+    return conf;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    FileIterator<T> iter = new FileIterator<>(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+    private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.vectorizedModel();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+      this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      long numValuesToRead = Math.min(nextRowGroupStart - valuesRead, batchSize);
+      // batchSize is an integer, so casting to integer is safe
+      this.last = model.read((int) numValuesToRead);
+      valuesRead += numValuesToRead;
+      return last;
+    }
+
+    private void advance() {
+      while (shouldSkip[nextRowGroup]) {
+        nextRowGroup += 1;
+        reader.skipNextRowGroup();
+      }
+      PageReadStore pages;
+      try {
+        pages = reader.readNextRowGroup();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+      model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
+      nextRowGroupStart += pages.getRowCount();
+      nextRowGroup += 1;
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
new file mode 100644
index 0000000..50179ec
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+
+  /**
+   * Reads a batch of type @param &lt;T&gt; and of size numRows
+   * @param numRows number of rows to read
+   * @return batch of records of type @param &lt;T&gt;
+   */
+  T read(int numRows);
+
+  /**
+   *
+   * @param pages row group information for all the columns
+   * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group
+   */
+  void setRowGroupInfo(PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata);
+
+  /**
+   * Set up the reader to reuse the underlying containers used for storing batches
+   */
+  void reuseContainers(boolean reuse);
+}
+