You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/30 17:50:08 UTC
[incubator-iceberg] branch master updated: Vectorization: Parquet
additions to support batch reads (#710)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3c88ef1 Vectorization: Parquet additions to support batch reads (#710)
3c88ef1 is described below
commit 3c88ef1d89800dde9edb61e59b851e64ffef0f33
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Mon Dec 30 09:50:00 2019 -0800
Vectorization: Parquet additions to support batch reads (#710)
Co-authored-by: gautamkowshik@gmail.com
Co-authored-by: anjalinorwood@gmail.com
---
.../java/org/apache/iceberg/parquet/Parquet.java | 29 ++-
.../parquet/ParquetDictionaryRowGroupFilter.java | 32 +---
.../org/apache/iceberg/parquet/ParquetReader.java | 111 +----------
.../org/apache/iceberg/parquet/ParquetUtil.java | 32 +++-
.../java/org/apache/iceberg/parquet/ReadConf.java | 203 +++++++++++++++++++++
.../iceberg/parquet/ValuesAsBytesReader.java | 87 +++++++++
.../iceberg/parquet/VectorizedParquetReader.java | 150 +++++++++++++++
.../apache/iceberg/parquet/VectorizedReader.java | 51 ++++++
8 files changed, 549 insertions(+), 146 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index f03c5bc..689ddb1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -304,12 +304,14 @@ public class Parquet {
private Schema schema = null;
private Expression filter = null;
private ReadSupport<?> readSupport = null;
+ private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
private boolean filterRecords = true;
private boolean caseSensitive = true;
private Map<String, String> properties = Maps.newHashMap();
private boolean callInit = false;
private boolean reuseContainers = false;
+ private int maxRecordsPerBatch = 10000;
private ReadBuilder(InputFile file) {
this.file = file;
@@ -358,10 +360,19 @@ public class Parquet {
}
public ReadBuilder createReaderFunc(Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
+ Preconditions.checkArgument(this.batchedReaderFunc == null,
+ "Reader function cannot be set since the batched version is already set");
this.readerFunc = newReaderFunction;
return this;
}
+ public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
+ Preconditions.checkArgument(this.readerFunc == null,
+ "Batched reader function cannot be set since the non-batched version is already set");
+ this.batchedReaderFunc = func;
+ return this;
+ }
+
public ReadBuilder set(String key, String value) {
properties.put(key, value);
return this;
@@ -377,9 +388,14 @@ public class Parquet {
return this;
}
- @SuppressWarnings("unchecked")
+ public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
+ this.maxRecordsPerBatch = numRowsPerBatch;
+ return this;
+ }
+
+ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
public <D> CloseableIterable<D> build() {
- if (readerFunc != null) {
+ if (readerFunc != null || batchedReaderFunc != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
// remove read properties already set that may conflict with this read
@@ -402,8 +418,13 @@ public class Parquet {
ParquetReadOptions options = optionsBuilder.build();
- return new org.apache.iceberg.parquet.ParquetReader<>(
- file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
+ if (batchedReaderFunc != null) {
+ return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, filter, reuseContainers,
+ caseSensitive, maxRecordsPerBatch);
+ } else {
+ return new org.apache.iceberg.parquet.ParquetReader<>(
+ file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
+ }
}
ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 3b0198f..6dc1f6d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -40,8 +39,6 @@ import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -114,7 +111,7 @@ public class ParquetDictionaryRowGroupFilter {
PrimitiveType colType = fileSchema.getType(meta.getPath().toArray()).asPrimitiveType();
if (colType.getId() != null) {
int id = colType.getId().intValue();
- isFallback.put(id, hasNonDictionaryPages(meta));
+ isFallback.put(id, ParquetUtil.hasNonDictionaryPages(meta));
mayContainNulls.put(id, mayContainNull(meta));
}
}
@@ -385,31 +382,4 @@ public class ParquetDictionaryRowGroupFilter {
return meta.getStatistics() == null || meta.getStatistics().getNumNulls() != 0;
}
- @SuppressWarnings("deprecation")
- private static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
- EncodingStats stats = meta.getEncodingStats();
- if (stats != null) {
- return stats.hasNonDictionaryEncodedPages();
- }
-
- // without EncodingStats, fall back to testing the encoding list
- Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
- if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
- // if remove returned true, PLAIN_DICTIONARY was present, which means at
- // least one page was dictionary encoded and 1.0 encodings are used
-
- // RLE and BIT_PACKED are only used for repetition or definition levels
- encodings.remove(Encoding.RLE);
- encodings.remove(Encoding.BIT_PACKED);
-
- // when empty, no encodings other than dictionary or rep/def levels
- return !encodings.isEmpty();
- } else {
- // if PLAIN_DICTIONARY wasn't present, then either the column is not
- // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
- // for 2.0, this cannot determine whether a page fell back without
- // page encoding stats
- return true;
- }
- }
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 80f092d..9ba36b3 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.parquet;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
-import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -34,7 +33,6 @@ import org.apache.iceberg.io.InputFile;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;
public class ParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
@@ -59,122 +57,15 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
this.caseSensitive = caseSensitive;
}
- private static class ReadConf<T> {
- private final ParquetFileReader reader;
- private final InputFile file;
- private final ParquetReadOptions options;
- private final MessageType projection;
- private final ParquetValueReader<T> model;
- private final List<BlockMetaData> rowGroups;
- private final boolean[] shouldSkip;
- private final long totalValues;
- private final boolean reuseContainers;
-
- @SuppressWarnings("unchecked")
- ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
- Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
- boolean caseSensitive) {
- this.file = file;
- this.options = options;
- this.reader = newReader(file, options);
-
- MessageType fileSchema = reader.getFileMetaData().getSchema();
-
- boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
- MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
-
- this.projection = hasIds ?
- ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
- ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
- this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
- this.rowGroups = reader.getRowGroups();
- this.shouldSkip = new boolean[rowGroups.size()];
-
- ParquetMetricsRowGroupFilter statsFilter = null;
- ParquetDictionaryRowGroupFilter dictFilter = null;
- if (filter != null) {
- statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
- dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
- }
-
- long computedTotalValues = 0L;
- for (int i = 0; i < shouldSkip.length; i += 1) {
- BlockMetaData rowGroup = rowGroups.get(i);
- boolean shouldRead = filter == null || (
- statsFilter.shouldRead(typeWithIds, rowGroup) &&
- dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
- this.shouldSkip[i] = !shouldRead;
- if (shouldRead) {
- computedTotalValues += rowGroup.getRowCount();
- }
- }
-
- this.totalValues = computedTotalValues;
- this.reuseContainers = reuseContainers;
- }
-
- ReadConf(ReadConf<T> toCopy) {
- this.reader = null;
- this.file = toCopy.file;
- this.options = toCopy.options;
- this.projection = toCopy.projection;
- this.model = toCopy.model;
- this.rowGroups = toCopy.rowGroups;
- this.shouldSkip = toCopy.shouldSkip;
- this.totalValues = toCopy.totalValues;
- this.reuseContainers = toCopy.reuseContainers;
- }
-
- ParquetFileReader reader() {
- if (reader != null) {
- reader.setRequestedSchema(projection);
- return reader;
- }
-
- ParquetFileReader newReader = newReader(file, options);
- newReader.setRequestedSchema(projection);
- return newReader;
- }
-
- ParquetValueReader<T> model() {
- return model;
- }
-
- boolean[] shouldSkip() {
- return shouldSkip;
- }
-
- long totalValues() {
- return totalValues;
- }
-
- boolean reuseContainers() {
- return reuseContainers;
- }
-
- ReadConf<T> copy() {
- return new ReadConf<>(this);
- }
-
- private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
- try {
- return ParquetFileReader.open(ParquetIO.file(file), options);
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
- }
- }
- }
-
private ReadConf<T> conf = null;
private ReadConf<T> init() {
if (conf == null) {
ReadConf<T> readConf = new ReadConf<>(
- input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive);
+ input, options, expectedSchema, filter, readerFunc, null, reuseContainers, caseSensitive, null);
this.conf = readConf.copy();
return readConf;
}
-
return conf;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 75cc8f6..674243c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,8 @@ import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BinaryUtil;
import org.apache.iceberg.util.UnicodeUtil;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -127,7 +130,6 @@ public class ParquetUtil {
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
}
-
/**
* @return a list of offsets in ascending order determined by the starting position
* of the row groups
@@ -225,4 +227,32 @@ public class ParquetUtil {
}
return bufferMap;
}
+
+ @SuppressWarnings("deprecation")
+ public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
+ EncodingStats stats = meta.getEncodingStats();
+ if (stats != null) {
+ return stats.hasNonDictionaryEncodedPages();
+ }
+
+ // without EncodingStats, fall back to testing the encoding list
+ Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
+ if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
+ // if remove returned true, PLAIN_DICTIONARY was present, which means at
+ // least one page was dictionary encoded and 1.0 encodings are used
+
+ // RLE and BIT_PACKED are only used for repetition or definition levels
+ encodings.remove(Encoding.RLE);
+ encodings.remove(Encoding.BIT_PACKED);
+
+ // when empty, no encodings other than dictionary or rep/def levels
+ return !encodings.isEmpty();
+ } else {
+ // if PLAIN_DICTIONARY wasn't present, then either the column is not
+ // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
+ // for 2.0, this cannot determine whether a page fell back without
+ // page encoding stats
+ return true;
+ }
+ }
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
new file mode 100644
index 0000000..51a19d4
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+ private final ParquetFileReader reader;
+ private final InputFile file;
+ private final ParquetReadOptions options;
+ private final MessageType projection;
+ @Nullable
+ private final ParquetValueReader<T> model;
+ @Nullable
+ private final VectorizedReader<T> vectorizedModel;
+ private final List<BlockMetaData> rowGroups;
+ private final boolean[] shouldSkip;
+ private final long totalValues;
+ private final boolean reuseContainers;
+ @Nullable
+ private final Integer batchSize;
+
+ // List of column chunk metadata for each row group
+ private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
+
+ @SuppressWarnings("unchecked")
+ ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+ Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+ VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+ boolean caseSensitive, Integer bSize) {
+ this.file = file;
+ this.options = options;
+ this.reader = newReader(file, options);
+ MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+ boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+ MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+ this.projection = hasIds ?
+ ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+ ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+ this.rowGroups = reader.getRowGroups();
+ this.shouldSkip = new boolean[rowGroups.size()];
+
+ ParquetMetricsRowGroupFilter statsFilter = null;
+ ParquetDictionaryRowGroupFilter dictFilter = null;
+ if (filter != null) {
+ statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+ dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+ }
+
+ long computedTotalValues = 0L;
+ for (int i = 0; i < shouldSkip.length; i += 1) {
+ BlockMetaData rowGroup = rowGroups.get(i);
+ boolean shouldRead = filter == null || (
+ statsFilter.shouldRead(typeWithIds, rowGroup) &&
+ dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+ this.shouldSkip[i] = !shouldRead;
+ if (shouldRead) {
+ computedTotalValues += rowGroup.getRowCount();
+ }
+ }
+
+ this.totalValues = computedTotalValues;
+ if (readerFunc != null) {
+ this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
+ this.vectorizedModel = null;
+ this.columnChunkMetaDataForRowGroups = null;
+ } else {
+ this.model = null;
+ this.vectorizedModel = (VectorizedReader<T>) batchedReaderFunc.apply(typeWithIds);
+ this.columnChunkMetaDataForRowGroups = getColumnChunkMetadataForRowGroups();
+ }
+
+ this.reuseContainers = reuseContainers;
+ this.batchSize = bSize;
+ }
+
+ private ReadConf(ReadConf<T> toCopy) {
+ this.reader = null;
+ this.file = toCopy.file;
+ this.options = toCopy.options;
+ this.projection = toCopy.projection;
+ this.model = toCopy.model;
+ this.rowGroups = toCopy.rowGroups;
+ this.shouldSkip = toCopy.shouldSkip;
+ this.totalValues = toCopy.totalValues;
+ this.reuseContainers = toCopy.reuseContainers;
+ this.batchSize = toCopy.batchSize;
+ this.vectorizedModel = toCopy.vectorizedModel;
+ this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
+ }
+
+ ParquetFileReader reader() {
+ if (reader != null) {
+ reader.setRequestedSchema(projection);
+ return reader;
+ }
+
+ ParquetFileReader newReader = newReader(file, options);
+ newReader.setRequestedSchema(projection);
+ return newReader;
+ }
+
+ ParquetValueReader<T> model() {
+ return model;
+ }
+
+ VectorizedReader<T> vectorizedModel() {
+ return vectorizedModel;
+ }
+
+ boolean[] shouldSkip() {
+ return shouldSkip;
+ }
+
+ long totalValues() {
+ return totalValues;
+ }
+
+ boolean reuseContainers() {
+ return reuseContainers;
+ }
+
+ Integer batchSize() {
+ return batchSize;
+ }
+
+ List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() {
+ return columnChunkMetaDataForRowGroups;
+ }
+
+ ReadConf<T> copy() {
+ return new ReadConf<>(this);
+ }
+
+ private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+ try {
+ return ParquetFileReader.open(ParquetIO.file(file), options);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+ }
+ }
+
+ private List<Map<ColumnPath, ColumnChunkMetaData>> getColumnChunkMetadataForRowGroups() {
+ Set<ColumnPath> projectedColumns = projection.getColumns().stream()
+ .map(columnDescriptor -> ColumnPath.get(columnDescriptor.getPath())).collect(Collectors.toSet());
+ ImmutableList.Builder<Map<ColumnPath, ColumnChunkMetaData>> listBuilder = ImmutableList.builder();
+ for (int i = 0; i < rowGroups.size(); i++) {
+ if (!shouldSkip[i]) {
+ BlockMetaData blockMetaData = rowGroups.get(i);
+ ImmutableMap.Builder<ColumnPath, ColumnChunkMetaData> mapBuilder = ImmutableMap.builder();
+ blockMetaData.getColumns().stream()
+ .filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath()))
+ .forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData));
+ listBuilder.add(mapBuilder.build());
+ } else {
+ listBuilder.add(ImmutableMap.of());
+ }
+ }
+ return listBuilder.build();
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
new file mode 100644
index 0000000..6100979
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Implements a {@link ValuesReader} specifically to read given number of bytes from the underlying {@link
+ * ByteBufferInputStream}.
+ */
+public class ValuesAsBytesReader extends ValuesReader {
+ private ByteBufferInputStream valuesInputStream = null;
+ // Only used for booleans.
+ private int bitOffset;
+ private byte currentByte = 0;
+
+ public ValuesAsBytesReader() {
+ }
+
+ @Override
+ public void initFromPage(int valueCount, ByteBufferInputStream in) {
+ this.valuesInputStream = in;
+ }
+
+ @Override
+ public void skip() {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBuffer(int length) {
+ try {
+ return valuesInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
+ }
+ }
+
+ @Override
+ public final int readInteger() {
+ return getBuffer(4).getInt();
+ }
+
+ @Override
+ public final boolean readBoolean() {
+ if (bitOffset == 0) {
+ currentByte = getByte();
+ }
+
+ boolean value = (currentByte & (1 << bitOffset)) != 0;
+ bitOffset += 1;
+ if (bitOffset == 8) {
+ bitOffset = 0;
+ }
+ return value;
+ }
+
+ private byte getByte() {
+ try {
+ return (byte) valuesInputStream.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read a byte", e);
+ }
+ }
+}
+
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
new file mode 100644
index 0000000..585f995
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+ private final InputFile input;
+ private final Schema expectedSchema;
+ private final ParquetReadOptions options;
+ private final Function<MessageType, VectorizedReader<?>> batchReaderFunc;
+ private final Expression filter;
+ private boolean reuseContainers;
+ private final boolean caseSensitive;
+ private final int batchSize;
+
+ public VectorizedParquetReader(
+ InputFile input, Schema expectedSchema, ParquetReadOptions options,
+ Function<MessageType, VectorizedReader<?>> readerFunc,
+ Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+ this.input = input;
+ this.expectedSchema = expectedSchema;
+ this.options = options;
+ this.batchReaderFunc = readerFunc;
+ // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+ this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+ this.reuseContainers = reuseContainers;
+ this.caseSensitive = caseSensitive;
+ this.batchSize = maxRecordsPerBatch;
+ }
+
+ private ReadConf conf = null;
+
+ private ReadConf init() {
+ if (conf == null) {
+ ReadConf readConf = new ReadConf(
+ input, options, expectedSchema, filter, null, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+ this.conf = readConf.copy();
+ return readConf;
+ }
+ return conf;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ FileIterator<T> iter = new FileIterator<>(init());
+ addCloseable(iter);
+ return iter;
+ }
+
+ private static class FileIterator<T> implements Iterator<T>, Closeable {
+ private final ParquetFileReader reader;
+ private final boolean[] shouldSkip;
+ private final VectorizedReader<T> model;
+ private final long totalValues;
+ private final int batchSize;
+ private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
+ private int nextRowGroup = 0;
+ private long nextRowGroupStart = 0;
+ private long valuesRead = 0;
+ private T last = null;
+
+ FileIterator(ReadConf conf) {
+ this.reader = conf.reader();
+ this.shouldSkip = conf.shouldSkip();
+ this.model = conf.vectorizedModel();
+ this.totalValues = conf.totalValues();
+ this.model.reuseContainers(conf.reuseContainers());
+ this.batchSize = conf.batchSize();
+ this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return valuesRead < totalValues;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (valuesRead >= nextRowGroupStart) {
+ advance();
+ }
+ long numValuesToRead = Math.min(nextRowGroupStart - valuesRead, batchSize);
+ // batchSize is an integer, so casting to integer is safe
+ this.last = model.read((int) numValuesToRead);
+ valuesRead += numValuesToRead;
+ return last;
+ }
+
+ private void advance() {
+ while (shouldSkip[nextRowGroup]) {
+ nextRowGroup += 1;
+ reader.skipNextRowGroup();
+ }
+ PageReadStore pages;
+ try {
+ pages = reader.readNextRowGroup();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
+ nextRowGroupStart += pages.getRowCount();
+ nextRowGroup += 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
new file mode 100644
index 0000000..50179ec
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+
+ /**
+ * Reads a batch of type @param <T> and of size numRows
+ * @param numRows number of rows to read
+ * @return batch of records of type @param <T>
+ */
+ T read(int numRows);
+
+ /**
+ *
+ * @param pages row group information for all the columns
+ * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group
+ */
+ void setRowGroupInfo(PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata);
+
+ /**
+ * Set up the reader to reuse the underlying containers used for storing batches
+ */
+ void reuseContainers(boolean reuse);
+}
+