You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/20 23:42:04 UTC
[incubator-iceberg] branch vectorized-read updated: Clean up code,
rename classes, add unit tests for dictionary encoded data (#664)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
View the commit online:
https://github.com/apache/incubator-iceberg/commit/25c7e4a4a8d3d57cfc76dcb45ea4434f0ffc9895
The following commit(s) were added to refs/heads/vectorized-read by this push:
new 25c7e4a Clean up code, rename classes, add unit tests for dictionary encoded data (#664)
25c7e4a is described below
commit 25c7e4a4a8d3d57cfc76dcb45ea4434f0ffc9895
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Wed Nov 20 15:41:56 2019 -0800
Clean up code, rename classes, add unit tests for dictionary encoded data (#664)
---
.../org/apache/iceberg/parquet/BatchedReader.java | 7 -
.../java/org/apache/iceberg/parquet/Parquet.java | 5 +-
.../org/apache/iceberg/parquet/ParquetReader.java | 17 +-
.../iceberg/parquet/VectorizedValuesReader.java | 1304 --------------------
.../parquet/arrow/IcebergArrowColumnVector.java | 22 +-
.../parquet/arrow/IcebergDecimalArrowVector.java | 21 +-
.../parquet/arrow/IcebergVarBinaryArrowVector.java | 3 +-
.../parquet/arrow/IcebergVarcharArrowVector.java | 5 +-
.../ColumnarBatchReaders.java} | 31 +-
.../{ => vectorized}/NullabilityHolder.java | 2 +-
.../iceberg/parquet/vectorized/VectorHolder.java | 64 +
.../VectorizedArrowReader.java} | 93 +-
.../VectorizedColumnIterator.java} | 62 +-
.../VectorizedPageIterator.java} | 62 +-
.../vectorized/VectorizedParquetValuesReader.java | 1233 ++++++++++++++++++
.../VectorizedReader.java} | 40 +-
...ectorizedDictionaryEncodedStringsBenchmark.java | 7 +-
...edFallbackToPlainEncodingStringsBenchmark.java} | 60 +-
.../data/vector/VectorizedSparkParquetReaders.java | 248 ++--
.../org/apache/iceberg/spark/source/Reader.java | 4 +-
.../apache/iceberg/spark/data/AvroDataTest.java | 6 +-
.../apache/iceberg/spark/data/DictionaryData.java | 289 +++++
.../org/apache/iceberg/spark/data/TestHelpers.java | 33 +-
...kParquetDictionaryEncodedVectorizedReader.java} | 14 +-
...ackToDictionaryEncodingForVectorizedReader.java | 1 -
.../data/TestSparkParquetVectorizedReader.java | 23 +-
26 files changed, 1938 insertions(+), 1718 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/BatchedReader.java
deleted file mode 100644
index 335636e..0000000
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedReader.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.iceberg.parquet;
-
-/**
- * Marker interface for vectorized Iceberg readers.
- */
-public interface BatchedReader {
-}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 5b8d53c..9d83ce8 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.vectorized.VectorizedReader;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroReadSupport;
@@ -287,7 +288,7 @@ public class Parquet {
private StructType sparkSchema = null;
private Expression filter = null;
private ReadSupport<?> readSupport = null;
- private Function<MessageType, BatchedReader> readerFunc = null;
+ private Function<MessageType, VectorizedReader> readerFunc = null;
private boolean filterRecords = true;
private boolean caseSensitive = true;
private Map<String, String> properties = Maps.newHashMap();
@@ -347,7 +348,7 @@ public class Parquet {
return this;
}
- public ReadBuilder createReaderFunc(Function<MessageType, BatchedReader> readerFunc) {
+ public ReadBuilder createReaderFunc(Function<MessageType, VectorizedReader> readerFunc) {
this.readerFunc = readerFunc;
return this;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 5a7ad4c..b525454 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.*;
import java.util.function.Function;
-import com.google.common.collect.ImmutableList;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
@@ -32,6 +31,8 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.vectorized.ColumnarBatchReaders;
+import org.apache.iceberg.parquet.vectorized.VectorizedReader;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
@@ -54,14 +55,14 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private final InputFile input;
private final Schema expectedSchema;
private final ParquetReadOptions options;
- private final Function<MessageType, BatchedReader> readerFunc;
+ private final Function<MessageType, VectorizedReader> readerFunc;
private final Expression filter;
private final boolean reuseContainers;
private final boolean caseSensitive;
private static final Logger LOG = LoggerFactory.getLogger(ParquetReader.class);
public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
- Function<MessageType, BatchedReader> readerFunc,
+ Function<MessageType, VectorizedReader> readerFunc,
Expression filter, boolean reuseContainers, boolean caseSensitive,
StructType sparkSchema, int maxRecordsPerBatch) {
this.input = input;
@@ -79,7 +80,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private final InputFile file;
private final ParquetReadOptions options;
private final MessageType projection;
- private final ColumnarBatchReader model;
+ private final ColumnarBatchReaders model;
private final List<BlockMetaData> rowGroups;
private final boolean[] shouldSkip;
private final long totalValues;
@@ -87,8 +88,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
@SuppressWarnings("unchecked")
ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
- Function<MessageType, ColumnarBatchReader> readerFunc, boolean reuseContainers,
- boolean caseSensitive) {
+ Function<MessageType, ColumnarBatchReaders> readerFunc, boolean reuseContainers,
+ boolean caseSensitive) {
this.file = file;
this.options = options;
this.reader = newReader(file, options);
@@ -152,7 +153,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
return newReader;
}
- ColumnarBatchReader model() {
+ ColumnarBatchReaders model() {
return model;
}
@@ -206,7 +207,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private static class FileIterator implements Iterator, Closeable {
private final ParquetFileReader reader;
private final boolean[] shouldSkip;
- private final ColumnarBatchReader model;
+ private final ColumnarBatchReaders model;
private final long totalValues;
private final boolean reuseContainers;
//private final List<BlockMetaData> blockMetaDataList;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java
deleted file mode 100644
index b7ebfed..0000000
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedValuesReader.java
+++ /dev/null
@@ -1,1304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iceberg.parquet;
-
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.*;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.bitpacking.BytePacker;
-import org.apache.parquet.column.values.bitpacking.Packer;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * A values reader for Parquet's run-length encoded data. This is based off of the version in
- * parquet-mr with these changes:
- * - Supports the vectorized interface.
- * - Works on byte arrays(byte[]) instead of making byte streams.
- * <p>
- * This encoding is used in multiple places:
- * - Definition/Repetition levels
- * - Dictionary ids.
- */
-public final class VectorizedValuesReader extends ValuesReader {
- // Current decoding mode. The encoded data contains groups of either run length encoded data
- // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
- // the number of values in the group.
- // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
- private enum MODE {
- RLE,
- PACKED
- }
-
- // Encoded data.
- private ByteBufferInputStream in;
-
- // bit/byte width of decoded data and utility to batch unpack them.
- private int bitWidth;
- private int bytesWidth;
- private BytePacker packer;
-
- // Current decoding mode and values
- private MODE mode;
- private int currentCount;
- private int currentValue;
-
- // Buffer of decoded values if the values are PACKED.
- private int[] currentBuffer = new int[16];
- private int currentBufferIdx = 0;
-
- // If true, the bit width is fixed. This decoder is used in different places and this also
- // controls if we need to read the bitwidth from the beginning of the data stream.
- private final boolean fixedWidth;
- private final boolean readLength;
- private final int maxDefLevel;
-
- public VectorizedValuesReader(int maxDefLevel) {
- this.maxDefLevel = maxDefLevel;
- this.fixedWidth = false;
- this.readLength = false;
- }
-
- public VectorizedValuesReader(
- int bitWidth,
- int maxDefLevel) {
- this.fixedWidth = true;
- this.readLength = bitWidth != 0;
- this.maxDefLevel = maxDefLevel;
- init(bitWidth);
- }
-
- public VectorizedValuesReader(
- int bitWidth,
- boolean readLength,
- int maxDefLevel) {
- this.fixedWidth = true;
- this.readLength = readLength;
- this.maxDefLevel = maxDefLevel;
- init(bitWidth);
- }
-
- @Override
- public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
- this.in = in;
- if (fixedWidth) {
- // initialize for repetition and definition levels
- if (readLength) {
- int length = readIntLittleEndian();
- this.in = in.sliceStream(length);
- }
- } else {
- // initialize for values
- if (in.available() > 0) {
- init(in.read());
- }
- }
- if (bitWidth == 0) {
- // 0 bit width, treat this as an RLE run of valueCount number of 0's.
- this.mode = MODE.RLE;
- this.currentCount = valueCount;
- this.currentValue = 0;
- } else {
- this.currentCount = 0;
- }
- }
-
-
-
- /**
- * Initializes the internal state for decoding ints of `bitWidth`.
- */
- private void init(int bitWidth) {
- Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
- this.bitWidth = bitWidth;
- this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
- this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
- }
-
- /**
- * Reads the next varint encoded int.
- */
- private int readUnsignedVarInt() throws IOException {
- int value = 0;
- int shift = 0;
- int b;
- do {
- b = in.read();
- value |= (b & 0x7F) << shift;
- shift += 7;
- } while ((b & 0x80) != 0);
- return value;
- }
-
- /**
- * Reads the next 4 byte little endian int.
- */
- private int readIntLittleEndian() throws IOException {
- int ch4 = in.read();
- int ch3 = in.read();
- int ch2 = in.read();
- int ch1 = in.read();
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
-
- /**
- * Reads the next byteWidth little endian int.
- */
- private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
- switch (bytesWidth) {
- case 0:
- return 0;
- case 1:
- return in.read();
- case 2: {
- int ch2 = in.read();
- int ch1 = in.read();
- return (ch1 << 8) + ch2;
- }
- case 3: {
- int ch3 = in.read();
- int ch2 = in.read();
- int ch1 = in.read();
- return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
- }
- case 4: {
- return readIntLittleEndian();
- }
- }
- throw new RuntimeException("Unreachable");
- }
-
- private int ceil8(int value) {
- return (value + 7) / 8;
- }
-
- /**
- * Reads the next group.
- */
- private void readNextGroup() {
- try {
- int header = readUnsignedVarInt();
- this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
- switch (mode) {
- case RLE:
- this.currentCount = header >>> 1;
- this.currentValue = readIntLittleEndianPaddedOnBitWidth();
- return;
- case PACKED:
- int numGroups = header >>> 1;
- this.currentCount = numGroups * 8;
-
- if (this.currentBuffer.length < this.currentCount) {
- this.currentBuffer = new int[this.currentCount];
- }
- currentBufferIdx = 0;
- int valueIndex = 0;
- while (valueIndex < this.currentCount) {
- // values are bit packed 8 at a time, so reading bitWidth will always work
- ByteBuffer buffer = in.slice(bitWidth);
- this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
- valueIndex += 8;
- }
- return;
- default:
- throw new ParquetDecodingException("not a valid mode " + this.mode);
- }
- } catch (IOException e) {
- throw new ParquetDecodingException("Failed to read from input stream", e);
- }
- }
-
- @Override
- public boolean readBoolean() {
- return this.readInteger() != 0;
- }
-
- @Override
- public void skip() {
- this.readInteger();
- }
-
- @Override
- public int readValueDictionaryId() {
- return readInteger();
- }
-
- @Override
- public int readInteger() {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
-
- this.currentCount--;
- switch (mode) {
- case RLE:
- return this.currentValue;
- case PACKED:
- return this.currentBuffer[currentBufferIdx++];
- }
- throw new RuntimeException("Unreachable");
- }
-
- public void readBatchOfDictionaryIds(
- final IntVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader dictionaryEncodedValuesReader) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, n);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- vector.set(idx, dictionaryEncodedValuesReader.readInteger());
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
- // check definition level.
- private void readDictionaryIdsInternal(final IntVector c, final int numValsInVector, final int numValuesToRead) {
- int left = numValuesToRead;
- int idx = numValsInVector;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- c.set(idx, currentValue);
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- c.set(idx, currentBuffer[currentBufferIdx]);
- currentBufferIdx++;
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfIntegers(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- ArrowBuf dataBuffer = vector.getDataBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- bufferIdx =
- fillFixWidthValueBuffer(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- dataBuffer,
- n);
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
- //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
- dataBuffer.setInt(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth).getInt());
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- bufferIdx++;
- } else {
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedIntegers(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedIntegersInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setInt(idx, dict.decodeToInt(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setInt(idx, dict.decodeToInt(currentBuffer[currentBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfLongs(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- ArrowBuf dataBuffer = vector.getDataBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- bufferIdx =
- fillFixWidthValueBuffer(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- dataBuffer,
- n);
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
- //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
- dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- bufferIdx++;
- } else {
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedLongs(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedLongsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentBuffer[currentBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfFloats(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- ArrowBuf dataBuffer = vector.getDataBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- bufferIdx =
- fillFixWidthValueBuffer(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- dataBuffer,
- n);
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- //ByteBuffer buffer = valuesReader.getBuffer(typeWidth);
- //dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
- dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
- //dataBuffer.setFloat(bufferIdx, valuesReader.getBuffer(typeWidth).getFloat());
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- bufferIdx++;
- } else {
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedFloats(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedFloatsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentBuffer[currentBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDoubles(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- ArrowBuf dataBuffer = vector.getDataBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- bufferIdx =
- fillFixWidthValueBuffer(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- dataBuffer,
- n);
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- bufferIdx++;
- } else {
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedDoubles(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedDoublesInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentBuffer[currentBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfFixedWidthBinary(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- // for (int i = 0; i < n; i++) {
- // //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- // validityBufferIdx++;
- // }
- for (int i = 0; i < n; i++) {
- bufferIdx = setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
- }
- } else {
- for (int i = 0; i < n; i++) {
- //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- bufferIdx = setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
- } else {
- //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedFixedWidthBinary(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentBuffer[currentBufferIdx++]).getBytesUnsafe());
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfFixedLengthDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- //ArrowBuf validityBuffer = vector.getValidityBuffer();
- //ArrowBuf dataBuffer = vector.getDataBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- // for (int i = 0; i < n; i++) {
- // //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- // validityBufferIdx++;
- // }
- for (int i = 0; i < n; i++) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- //bytesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
- bufferIdx++;
- }
- } else {
- for (int i = 0; i < n; i++) {
- //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
- bufferIdx++;
- } else {
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedFixedLengthDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
- byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
- byte[] decimalBytes = dict.decodeToBinary(currentBuffer[currentBufferIdx++]).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- /**
- * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
- * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
- * vector. It appropriately sets the validity buffer in the Arrow vector.
- */
- public void readBatchVarWidth(
- final FieldVector vector,
- final int numValsInVector,
- final int batchSize,
- NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- ArrowBuf dataBuffer = vector.getDataBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- // for (int i = 0; i < n; i++) {
- // //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- // validityBufferIdx++;
- // }
- for (int i = 0; i < n; i++) {
- int len = valuesReader.readInteger();
- ByteBuffer buffer = valuesReader.getBuffer(len);
- ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
- dataBuffer.writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
- bufferIdx++;
- }
- } else {
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- nullabilityHolder.setNulls(bufferIdx, n);
- bufferIdx += n;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- int len = valuesReader.readInteger();
- ByteBuffer buffer = valuesReader.getBuffer(len);
- ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
- dataBuffer.writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
- bufferIdx++;
- } else {
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedVarWidth(
- final FieldVector vector, final int numValsInVector,
- final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader dictionaryEncodedValuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- ((BaseVariableWidthVector)vector).setSafe(idx, dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(FieldVector vector, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- ((BaseVariableWidthVector)vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- ((BaseVariableWidthVector)vector).setSafe(idx, dict.decodeToBinary(currentBuffer[currentBufferIdx++]).getBytesUnsafe());
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfIntLongBackedDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- ArrowBuf dataBuffer = vector.getDataBuffer();
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- dataBuffer.setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
- bufferIdx++;
- }
- } else {
- nullabilityHolder.setNulls(bufferIdx, n);
- bufferIdx += n;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- dataBuffer.setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
- bufferIdx++;
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- } else {
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
- idx += n;
- } else {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(valuesReader.readInteger()) : dict.decodeToLong(valuesReader.readInteger())));
- idx++;
- } else {
- nullabilityHolder.setNull(idx);
- idx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(FieldVector vector, final int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- ((DecimalVector) vector).set(idx, typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentBuffer[currentBufferIdx++])));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- public void readBatchOfBooleans(
- final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
- bufferIdx++;
- }
- } else {
- for (int i = 0; i < n; i++) {
- ((BitVector) vector).setNull(bufferIdx);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefLevel) {
- ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
- bufferIdx++;
- } else {
- ((BitVector) vector).setNull(bufferIdx);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
- }
-
- private int setBinaryInVector(VarBinaryVector vector, int typeWidth, BytesReader valuesReader, int bufferIdx) {
- byte[] byteArray = new byte[typeWidth];
- valuesReader.getBuffer(typeWidth).get(byteArray);
- vector.setSafe(bufferIdx, byteArray);
- bufferIdx++;
- return bufferIdx;
- }
-
- private int fillFixWidthValueBuffer(
- int typeWidth, int maxDefLevel, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader, int bufferIdx, ArrowBuf dataBuffer, int n) {
- if (currentValue == maxDefLevel) {
- // for (int i = 0; i < n; i++) {
- // //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- // validityBufferIdx++;
- // }
- ByteBuffer buffer = valuesReader.getBuffer(n * typeWidth);
- dataBuffer.setBytes(bufferIdx * typeWidth, buffer);
- bufferIdx += n;
- } else {
- for (int i = 0; i < n; i++) {
- //BitVectorHelper.setValidityBit(validityBuffer, validityBufferIdx, 0);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- return bufferIdx;
- }
-
-}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
index a3c8460..7b30948 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ArrowBuf;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
-import org.apache.iceberg.parquet.NullabilityHolder;
-import org.apache.iceberg.parquet.ParquetUtil;
-import org.apache.iceberg.parquet.VectorReader;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.VectorHolder;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@@ -58,7 +59,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
private final boolean isVectorDictEncoded;
private ArrowColumnVector[] childColumns;
- public IcebergArrowColumnVector(VectorReader.VectorHolder holder, NullabilityHolder nulls) {
+ public IcebergArrowColumnVector(VectorHolder holder, NullabilityHolder nulls) {
super(ArrowUtils.fromArrowField(holder.getVector().getField()));
this.nullabilityHolder = nulls;
this.columnDescriptor = holder.getDescriptor();
@@ -67,6 +68,11 @@ public class IcebergArrowColumnVector extends ColumnVector {
this.accessor = getVectorAccessor(columnDescriptor, holder.getVector());
}
+ @VisibleForTesting
+ public ArrowVectorAccessor getAccessor() {
+ return accessor;
+ }
+
@Override
public void close() {
if (childColumns != null) {
@@ -169,7 +175,8 @@ public class IcebergArrowColumnVector extends ColumnVector {
@Override
public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
- private abstract class ArrowVectorAccessor {
+ @VisibleForTesting
+ public abstract class ArrowVectorAccessor {
private final ValueVector vector;
@@ -229,6 +236,11 @@ public class IcebergArrowColumnVector extends ColumnVector {
ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException();
}
+
+ @VisibleForTesting
+ public ValueVector getUnderlyingArrowVector() {
+ return vector;
+ }
}
private ArrowVectorAccessor getVectorAccessor(ColumnDescriptor desc, ValueVector vector) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
index 1815414..9a833b0 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
@@ -1,8 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.DecimalVector;
-import org.apache.iceberg.parquet.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
/**
*
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
index 7115564..6d8922b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
@@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.iceberg.parquet.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
/**
* Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
index 2b7bc53..646ddd1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarCharVector;
-import org.apache.iceberg.parquet.NullabilityHolder;
+import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
/**
- * Extension of Arrow's @{@link VarCharVector}. The whole reason of having
+ * Extension of Arrow's @{@link VarCharVector}. The reason of having
* this implementation is to override the expensive {@link VarCharVector#isSet(int)} method.
*/
public class IcebergVarcharArrowVector extends VarCharVector {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
similarity index 67%
rename from parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
index 7204278..64c78af 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnarBatchReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
@@ -1,33 +1,34 @@
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
import org.apache.arrow.vector.FieldVector;
import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.util.List;
import java.util.Map;
-public class ColumnarBatchReader implements BatchedReader {
- private static final Logger LOG = LoggerFactory.getLogger(ColumnarBatchReader.class);
- private final VectorReader[] readers;
+/**
+ * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's
+ * vectorized read path. The {@link ColumnarBatch} returned is created by passing in the
+ * Arrow vectors populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}.
+ */
+public class ColumnarBatchReaders implements VectorizedReader {
+ private final VectorizedArrowReader[] readers;
- public ColumnarBatchReader(List<Type> types,
- Types.StructType icebergExpectedFields,
- List<BatchedReader> readers) {
- this.readers = (VectorReader[]) Array.newInstance(
- VectorReader.class, readers.size());
+ public ColumnarBatchReaders(List<Type> types,
+ Types.StructType icebergExpectedFields,
+ List<VectorizedReader> readers) {
+ this.readers = (VectorizedArrowReader[]) Array.newInstance(
+ VectorizedArrowReader.class, readers.size());
int i = 0;
- for (BatchedReader reader : readers) {
- this.readers[i] = (VectorReader) reader;
+ for (VectorizedReader reader : readers) {
+ this.readers[i] = (VectorizedArrowReader) reader;
i++;
}
@@ -47,7 +48,7 @@ public class ColumnarBatchReader implements BatchedReader {
int numRows = 0;
for (int i = 0; i < readers.length; i += 1) {
NullabilityHolder nullabilityHolder = new NullabilityHolder(readers[i].batchSize());
- VectorReader.VectorHolder holder = readers[i].read(nullabilityHolder);
+ VectorHolder holder = readers[i].read(nullabilityHolder);
FieldVector vector = holder.getVector();
icebergArrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
if (i > 0 && numRows != vector.getValueCount()) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
similarity index 96%
copy from parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
copy to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
index dbffdb0..70b84d9 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
public class NullabilityHolder {
private final boolean[] isNull;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
new file mode 100644
index 0000000..f11c968
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container class for holding the Arrow vector holding a batch of values
+ * along with other state needed for reading values out of it.
+ */
+public class VectorHolder {
+ private final ColumnDescriptor columnDescriptor;
+ private final FieldVector vector;
+ private final boolean isDictionaryEncoded;
+
+ @Nullable
+ private final Dictionary dictionary;
+
+
+ public VectorHolder(ColumnDescriptor columnDescriptor, FieldVector vector, boolean isDictionaryEncoded, Dictionary dictionary) {
+ this.columnDescriptor = columnDescriptor;
+ this.vector = vector;
+ this.isDictionaryEncoded = isDictionaryEncoded;
+ this.dictionary = dictionary;
+ }
+
+ public ColumnDescriptor getDescriptor() {
+ return columnDescriptor;
+ }
+
+ public FieldVector getVector() {
+ return vector;
+ }
+
+ public boolean isDictionaryEncoded() {
+ return isDictionaryEncoded;
+ }
+
+ public Dictionary getDictionary() {
+ return dictionary;
+ }
+
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
similarity index 75%
rename from parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
index 661097f..f44c5dc 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.*;
@@ -25,6 +25,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.iceberg.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergDecimalArrowVector;
import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarBinaryArrowVector;
import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarcharArrowVector;
@@ -37,31 +38,20 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.PrimitiveType;
-import javax.annotation.Nullable;
import java.util.Map;
/***
- * Creates and allocates space for Arrow field vectors based on Iceberg data type mapped to Arrow type.
- * Iceberg to Arrow Type mapping :
- * icebergType : LONG - Field Vector Type : org.apache.arrow.vector.BigIntVector
- * icebergType : STRING - Field Vector Type : org.apache.arrow.vector.VarCharVector
- * icebergType : BOOLEAN - Field Vector Type : org.apache.arrow.vector.BitVector
- * icebergType : INTEGER - Field Vector Type : org.apache.arrow.vector.IntVector
- * icebergType : FLOAT - Field Vector Type : org.apache.arrow.vector.Float4Vector
- * icebergType : DOUBLE - Field Vector Type : org.apache.arrow.vector.Float8Vector
- * icebergType : DATE - Field Vector Type : org.apache.arrow.vector.DateDayVector
- * icebergType : TIMESTAMP - Field Vector Type : org.apache.arrow.vector.TimeStampMicroTZVector
- * icebergType : STRING - Field Vector Type : org.apache.arrow.vector.VarCharVector
- * icebergType : BINARY - Field Vector Type : org.apache.arrow.vector.VarBinaryVector
- * icebergField : DECIMAL - Field Vector Type : org.apache.arrow.vector.DecimalVector
+ * {@link VectorizedReader VectorReader(s)} that read in a batch of values into Arrow vectors.
+ * It also takes care of allocating the right kind of Arrow vectors depending on the corresponding
+ * Iceberg/Parquet data types.
*/
-public class VectorReader implements BatchedReader {
+public class VectorizedArrowReader implements VectorizedReader {
public static final int DEFAULT_BATCH_SIZE = 5000;
public static final int UNKNOWN_WIDTH = -1;
private final ColumnDescriptor columnDescriptor;
private final int batchSize;
- private final BatchedColumnIterator batchedColumnIterator;
+ private final VectorizedColumnIterator vectorizedColumnIterator;
private final boolean isFixedLengthDecimal;
private final boolean isVarWidthType;
private final boolean isFixedWidthBinary;
@@ -86,7 +76,7 @@ public class VectorReader implements BatchedReader {
// this value if Arrow ends up changing this default.
private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
- public VectorReader(
+ public VectorizedArrowReader(
ColumnDescriptor desc,
Types.NestedField icebergField,
BufferAllocator rootAlloc,
@@ -104,7 +94,7 @@ public class VectorReader implements BatchedReader {
this.isLongType = ParquetUtil.isLongType(desc);
this.isFloatType = ParquetUtil.isFloatType(desc);
this.isDoubleType = ParquetUtil.isDoubleType(desc);
- this.batchedColumnIterator = new BatchedColumnIterator(desc, "", batchSize);
+ this.vectorizedColumnIterator = new VectorizedColumnIterator(desc, "", batchSize);
}
public VectorHolder read(NullabilityHolder nullabilityHolder) {
@@ -112,35 +102,39 @@ public class VectorReader implements BatchedReader {
typeWidth = allocateFieldVector(rootAlloc, icebergField, columnDescriptor);
}
vec.setValueCount(0);
- if (batchedColumnIterator.hasNext()) {
+ if (vectorizedColumnIterator.hasNext()) {
if (allPagesDictEncoded) {
- batchedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
} else {
if (isFixedLengthDecimal) {
- batchedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
((IcebergDecimalArrowVector) vec).setNullabilityHolder(nullabilityHolder);
} else if (isFixedWidthBinary) {
- batchedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth, nullabilityHolder);
+ // Fixed width binary type values are stored in an IcebergVarBinaryArrowVector as well
+ if (vec instanceof IcebergVarBinaryArrowVector) {
+ ((IcebergVarBinaryArrowVector) vec).setNullabilityHolder(nullabilityHolder);
+ }
+ vectorizedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth, nullabilityHolder);
} else if (isVarWidthType) {
if (vec instanceof IcebergVarcharArrowVector) {
((IcebergVarcharArrowVector) vec).setNullabilityHolder(nullabilityHolder);
} else if (vec instanceof IcebergVarBinaryArrowVector) {
((IcebergVarBinaryArrowVector) vec).setNullabilityHolder(nullabilityHolder);
}
- batchedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
} else if (isBooleanType) {
- batchedColumnIterator.nextBatchBoolean(vec, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchBoolean(vec, nullabilityHolder);
} else if (isPaddedDecimal) {
((IcebergDecimalArrowVector) vec).setNullabilityHolder(nullabilityHolder);
- batchedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
} else if (isIntType) {
- batchedColumnIterator.nextBatchIntegers(vec, typeWidth, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchIntegers(vec, typeWidth, nullabilityHolder);
} else if (isLongType) {
- batchedColumnIterator.nextBatchLongs(vec, typeWidth, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchLongs(vec, typeWidth, nullabilityHolder);
} else if (isFloatType) {
- batchedColumnIterator.nextBatchFloats(vec, typeWidth, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchFloats(vec, typeWidth, nullabilityHolder);
} else if (isDoubleType) {
- batchedColumnIterator.nextBatchDoubles(vec, typeWidth, nullabilityHolder);
+ vectorizedColumnIterator.nextBatchDoubles(vec, typeWidth, nullabilityHolder);
}
}
}
@@ -252,7 +246,7 @@ public class VectorReader implements BatchedReader {
DictionaryPageReadStore dictionaryPageReadStore,
Map<ColumnPath, Boolean> columnDictEncoded) {
allPagesDictEncoded = columnDictEncoded.get(ColumnPath.get(columnDescriptor.getPath()));
- dictionary = batchedColumnIterator.setRowGroupInfo(source, dictionaryPageReadStore, allPagesDictEncoded);
+ dictionary = vectorizedColumnIterator.setRowGroupInfo(source, dictionaryPageReadStore, allPagesDictEncoded);
}
@Override
@@ -264,42 +258,5 @@ public class VectorReader implements BatchedReader {
return batchSize;
}
- public Types.NestedField getIcebergField() {
- return icebergField;
- }
-
- public static class VectorHolder {
- private final ColumnDescriptor columnDescriptor;
- private final FieldVector vector;
- private final boolean isDictionaryEncoded;
-
- @Nullable
- private final Dictionary dictionary;
-
-
- public VectorHolder(ColumnDescriptor columnDescriptor, FieldVector vector, boolean isDictionaryEncoded, Dictionary dictionary) {
- this.columnDescriptor = columnDescriptor;
- this.vector = vector;
- this.isDictionaryEncoded = isDictionaryEncoded;
- this.dictionary = dictionary;
- }
-
- public ColumnDescriptor getDescriptor() {
- return columnDescriptor;
- }
-
- public FieldVector getVector() {
- return vector;
- }
-
- public boolean isDictionaryEncoded() {
- return isDictionaryEncoded;
- }
-
- public Dictionary getDictionary() {
- return dictionary;
- }
-
- }
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
similarity index 76%
rename from parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
index 0a6c28d..fafaec6 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedColumnIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
@@ -17,49 +17,49 @@
* under the License.
*/
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
import java.io.IOException;
-import java.util.Map;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.page.*;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.io.ParquetDecodingException;
-public class BatchedColumnIterator {
+/**
+ * Vectorized version of the ColumnIterator that reads column values in data pages of a column
+ * in a row group in a batched fashion.
+ */
+public class VectorizedColumnIterator {
private final ColumnDescriptor desc;
- private final BatchedPageIterator batchedPageIterator;
+ private final VectorizedPageIterator vectorizedPageIterator;
// state reset for each row group
- private PageReader pageSource = null;
+ private PageReader columnPageReader = null;
private long totalValuesCount = 0L;
private long valuesRead = 0L;
private long advanceNextPageCount = 0L;
private final int batchSize;
- private boolean shouldVectorBeDictionaryEncoded;
- public BatchedColumnIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
+ public VectorizedColumnIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
this.desc = desc;
this.batchSize = batchSize;
- this.batchedPageIterator = new BatchedPageIterator(desc, writerVersion, batchSize);
+ this.vectorizedPageIterator = new VectorizedPageIterator(desc, writerVersion, batchSize);
}
public Dictionary setRowGroupInfo(PageReadStore store,
DictionaryPageReadStore dictionaryPageReadStore,
boolean allPagesDictEncoded) {
- this.pageSource = store.getPageReader(desc);
- this.totalValuesCount = pageSource.getTotalValueCount();
+ this.columnPageReader = store.getPageReader(desc);
+ this.totalValuesCount = columnPageReader.getTotalValueCount();
this.valuesRead = 0L;
this.advanceNextPageCount = 0L;
- this.batchedPageIterator.reset();
- Dictionary dict = readDictionary(desc, dictionaryPageReadStore);
- this.batchedPageIterator.setDictionary(dict, allPagesDictEncoded);
- this.shouldVectorBeDictionaryEncoded = allPagesDictEncoded;
+ this.vectorizedPageIterator.reset();
+ Dictionary dict = readDictionaryForColumn(desc, dictionaryPageReadStore);
+ this.vectorizedPageIterator.setDictionaryForColumn(dict, allPagesDictEncoded);
advance();
return dict;
}
@@ -67,11 +67,11 @@ public class BatchedColumnIterator {
private void advance() {
if (valuesRead >= advanceNextPageCount) {
// A parquet page may be empty i.e. contains no values
- while (!batchedPageIterator.hasNext()) {
- DataPage page = pageSource.readPage();
+ while (!vectorizedPageIterator.hasNext()) {
+ DataPage page = columnPageReader.readPage();
if (page != null) {
- batchedPageIterator.setPage(page);
- this.advanceNextPageCount += batchedPageIterator.currentPageCount();
+ vectorizedPageIterator.setPage(page);
+ this.advanceNextPageCount += vectorizedPageIterator.currentPageCount();
} else {
return;
}
@@ -90,7 +90,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -105,7 +105,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchDictionaryIds(vector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchDictionaryIds(vector, batchSize - rowsReadSoFar,
rowsReadSoFar, holder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -120,7 +120,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchLongs(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchLongs(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -135,7 +135,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchFloats(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchFloats(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -150,7 +150,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchDoubles(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchDoubles(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -165,7 +165,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -180,7 +180,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -195,7 +195,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -210,7 +210,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -225,7 +225,7 @@ public class BatchedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = batchedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch = vectorizedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -233,8 +233,8 @@ public class BatchedColumnIterator {
}
}
- private Dictionary readDictionary(ColumnDescriptor desc,
- DictionaryPageReadStore dictionaryPageReadStore) {
+ private Dictionary readDictionaryForColumn(ColumnDescriptor desc,
+ DictionaryPageReadStore dictionaryPageReadStore) {
if (dictionaryPageReadStore == null) {
return null;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
similarity index 85%
rename from parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
index 99ae8e1..618be17 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BatchedPageIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
@@ -17,13 +17,14 @@
* under the License.
*/
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.parquet.vectorized;
import com.google.common.base.Preconditions;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.iceberg.parquet.BytesReader;
import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
@@ -43,19 +44,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-import static java.lang.String.format;
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-public class BatchedPageIterator {
- private static final Logger LOG = LoggerFactory.getLogger(BatchedPageIterator.class);
- private VectorizedValuesReader definitionLevelReader;
- private boolean eagerDecodeDictionary;
- private final int batchSize;
-
- public BatchedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
+public class VectorizedPageIterator {
+ private static final Logger LOG = LoggerFactory.getLogger(VectorizedPageIterator.class);
+ public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
this.desc = desc;
this.writerVersion = writerVersion;
- this.batchSize = batchSize;
}
private final ColumnDescriptor desc;
@@ -64,18 +59,20 @@ public class BatchedPageIterator {
// iterator state
private boolean hasNext = false;
private int triplesRead = 0;
- private int currentDL = 0;
- private int currentRL = 0;
// page bookkeeping
private Dictionary dict = null;
private DataPage page = null;
private int triplesCount = 0;
- private Encoding valueEncoding = null;
- private IntIterator definitionLevels = null;
+
+ // Needed once we add support for complex types. Unused for now.
private IntIterator repetitionLevels = null;
+ private int currentRL = 0;
+
+ private VectorizedParquetValuesReader definitionLevelReader;
+ private boolean eagerDecodeDictionary;
private BytesReader plainValuesReader = null;
- private VectorizedValuesReader dictionaryEncodedValuesReader = null;
+ private VectorizedParquetValuesReader dictionaryEncodedValuesReader = null;
private boolean allPagesDictEncoded;
public void setPage(DataPage page) {
@@ -98,10 +95,8 @@ public class BatchedPageIterator {
}
// Dictionary is set per row group
- public void setDictionary(Dictionary dict, boolean allPagesDictEncoded) {
+ public void setDictionaryForColumn(Dictionary dict, boolean allPagesDictEncoded) {
this.dict = dict;
- // if all pages are not dictionary encoded, then we eagerly decode the
- // dictionary encoded data before writing them to the vectors.
this.allPagesDictEncoded = allPagesDictEncoded;
}
@@ -109,9 +104,9 @@ public class BatchedPageIterator {
this.page = null;
this.triplesCount = 0;
this.triplesRead = 0;
- this.definitionLevels = null;
this.repetitionLevels = null;
this.plainValuesReader = null;
+ this.definitionLevelReader = null;
this.hasNext = false;
}
@@ -124,7 +119,7 @@ public class BatchedPageIterator {
}
/**
- * Method for reading a batch of dictionary ids. Like definition levels, dictionary ids in Parquet are RLE
+ * Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels, dictionary ids in Parquet are RLE
* encoded as well.
*/
public int nextBatchDictionaryIds(final IntVector vector, final int expectedBatchSize,
@@ -141,9 +136,7 @@ public class BatchedPageIterator {
}
/**
- * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
- * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
- * vector. It appropriately sets the validity buffer in the Arrow vector.
+ * Method for reading a batch of values of INT32 data type
*/
public int nextBatchIntegers(final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
@@ -163,9 +156,7 @@ public class BatchedPageIterator {
}
/**
- * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
- * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
- * vector. It appropriately sets the validity buffer in the Arrow vector.
+ * Method for reading a batch of values of INT64 data type
*/
public int nextBatchLongs(final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
@@ -185,9 +176,7 @@ public class BatchedPageIterator {
}
/**
- * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
- * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
- * vector. It appropriately sets the validity buffer in the Arrow vector.
+ * Method for reading a batch of values of FLOAT data type.
*/
public int nextBatchFloats(final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
@@ -207,9 +196,7 @@ public class BatchedPageIterator {
}
/**
- * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
- * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
- * vector. It appropriately sets the validity buffer in the Arrow vector.
+ * Method for reading a batch of values of DOUBLE data type
*/
public int nextBatchDoubles(final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
@@ -234,7 +221,7 @@ public class BatchedPageIterator {
/**
* Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
- * Arrow stores all decimals in 16 bytes. This method provides the necessary padding to the decimals read.
+ * Since Arrow stores all decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
*/
public int nextBatchIntLongBackedDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
final int typeWidth, NullabilityHolder nullabilityHolder) {
@@ -341,7 +328,6 @@ public class BatchedPageIterator {
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
ValuesReader previousReader = plainValuesReader;
- this.valueEncoding = dataEncoding;
this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dict != null && !allPagesDictEncoded;
if (dataEncoding.usesDictionary()) {
if (dict == null) {
@@ -349,7 +335,7 @@ public class BatchedPageIterator {
"could not read page in col " + desc + " as the dictionary was missing for encoding " + dataEncoding);
}
try {
- dictionaryEncodedValuesReader = new VectorizedValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
+ dictionaryEncodedValuesReader = new VectorizedParquetValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
dictionaryEncodedValuesReader.initFromPage(valueCount, in);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page in col " + desc, e);
@@ -371,11 +357,10 @@ public class BatchedPageIterator {
ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
ValuesReader dlReader;
int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
- this.definitionLevelReader = new VectorizedValuesReader(
+ this.definitionLevelReader = new VectorizedParquetValuesReader(
bitWidth, desc.getMaxDefinitionLevel());
dlReader = this.definitionLevelReader;
this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
- this.definitionLevels = new ValuesReaderIntIterator(definitionLevelReader);
try {
BytesInput bytes = page.getBytes();
ByteBufferInputStream in = bytes.toInputStream();
@@ -390,12 +375,11 @@ public class BatchedPageIterator {
private void initFromPage(DataPageV2 page) {
this.triplesCount = page.getValueCount();
this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), page.getRepetitionLevels());
- this.definitionLevels = newRLEIterator(desc.getMaxDefinitionLevel(), page.getDefinitionLevels());
LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
try {
int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
- this.definitionLevelReader = new VectorizedValuesReader(bitWidth, false,
+ this.definitionLevelReader = new VectorizedParquetValuesReader(bitWidth, false,
desc.getMaxDefinitionLevel());
definitionLevelReader.initFromPage(triplesCount, page.getDefinitionLevels().toInputStream());
} catch (IOException e) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
new file mode 100644
index 0000000..5500352
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
@@ -0,0 +1,1233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.vector.*;
+import org.apache.iceberg.parquet.BytesReader;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A values reader for Parquet's run-length encoded data that reads column data in batches
+ * instead of one value at a time.
+ * This is based off of the version in Apache Spark with these changes:
+ * <p>
+ * <tr>Writes batches of values retrieved to Arrow vectors</tr>
+ * <tr>If all pages of a column within the row group are not dictionary encoded, then
+ * dictionary ids are eagerly decoded into actual values before writing them
+ * to the Arrow vectors</tr>
+ * </p>
+ */
+public final class VectorizedParquetValuesReader extends ValuesReader {
+
+ // Current decoding mode. The encoded data contains groups of either run length encoded data
+ // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+ // the number of values in the group.
+ private enum MODE {
+ RLE,
+ PACKED
+ }
+
+ // Encoded data.
+ private ByteBufferInputStream in;
+
+ // bit/byte width of decoded data and utility to batch unpack them.
+ private int bitWidth;
+ private int bytesWidth;
+ private BytePacker packer;
+
+ // Current decoding mode and values
+ private MODE mode;
+ private int currentCount;
+ private int currentValue;
+
+ // Buffer of decoded values if the values are PACKED.
+ private int[] packedValuesBuffer = new int[16];
+ private int packedValuesBufferIdx = 0;
+
+ // If true, the bit width is fixed. This decoder is used in different places and this also
+ // controls if we need to read the bitwidth from the beginning of the data stream.
+ private final boolean fixedWidth;
+ private final boolean readLength;
+ private final int maxDefLevel;
+
+ public VectorizedParquetValuesReader(int maxDefLevel) {
+ this.maxDefLevel = maxDefLevel;
+ this.fixedWidth = false;
+ this.readLength = false;
+ }
+
+ public VectorizedParquetValuesReader(
+ int bitWidth,
+ int maxDefLevel) {
+ this.fixedWidth = true;
+ this.readLength = bitWidth != 0;
+ this.maxDefLevel = maxDefLevel;
+ init(bitWidth);
+ }
+
+ public VectorizedParquetValuesReader(
+ int bitWidth,
+ boolean readLength,
+ int maxDefLevel) {
+ this.fixedWidth = true;
+ this.readLength = readLength;
+ this.maxDefLevel = maxDefLevel;
+ init(bitWidth);
+ }
+
+ @Override
+ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+ this.in = in;
+ if (fixedWidth) {
+ // initialize for repetition and definition levels
+ if (readLength) {
+ int length = readIntLittleEndian();
+ this.in = in.sliceStream(length);
+ }
+ } else {
+ // initialize for values
+ if (in.available() > 0) {
+ init(in.read());
+ }
+ }
+ if (bitWidth == 0) {
+ // 0 bit width, treat this as an RLE run of valueCount number of 0's.
+ this.mode = MODE.RLE;
+ this.currentCount = valueCount;
+ this.currentValue = 0;
+ } else {
+ this.currentCount = 0;
+ }
+ }
+
+
+ /**
+ * Initializes the internal state for decoding ints of `bitWidth`.
+ */
+ private void init(int bitWidth) {
+ Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+ this.bitWidth = bitWidth;
+ this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
+ this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+ }
+
+ /**
+ * Reads the next varint encoded int.
+ */
+ private int readUnsignedVarInt() throws IOException {
+ int value = 0;
+ int shift = 0;
+ int b;
+ do {
+ b = in.read();
+ value |= (b & 0x7F) << shift;
+ shift += 7;
+ } while ((b & 0x80) != 0);
+ return value;
+ }
+
+ /**
+ * Reads the next 4 byte little endian int.
+ */
+ private int readIntLittleEndian() throws IOException {
+ int ch4 = in.read();
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ /**
+ * Reads the next byteWidth little endian int.
+ */
+ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
+ switch (bytesWidth) {
+ case 0:
+ return 0;
+ case 1:
+ return in.read();
+ case 2: {
+ int ch2 = in.read();
+ int ch1 = in.read();
+ return (ch1 << 8) + ch2;
+ }
+ case 3: {
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
+ return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+ }
+ case 4: {
+ return readIntLittleEndian();
+ }
+ }
+ throw new RuntimeException("Unreachable");
+ }
+
+ /**
+ * Reads the next group.
+ */
+ private void readNextGroup() {
+ try {
+ int header = readUnsignedVarInt();
+ this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+ switch (mode) {
+ case RLE:
+ this.currentCount = header >>> 1;
+ this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+ return;
+ case PACKED:
+ int numGroups = header >>> 1;
+ this.currentCount = numGroups * 8;
+
+ if (this.packedValuesBuffer.length < this.currentCount) {
+ this.packedValuesBuffer = new int[this.currentCount];
+ }
+ packedValuesBufferIdx = 0;
+ int valueIndex = 0;
+ while (valueIndex < this.currentCount) {
+ // values are bit packed 8 at a time, so reading bitWidth will always work
+ ByteBuffer buffer = in.slice(bitWidth);
+ this.packer.unpack8Values(buffer, buffer.position(), this.packedValuesBuffer, valueIndex);
+ valueIndex += 8;
+ }
+ return;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + this.mode);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read from input stream", e);
+ }
+ }
+
+ @Override
+ public boolean readBoolean() {
+ return this.readInteger() != 0;
+ }
+
+ @Override
+ public void skip() {
+ this.readInteger();
+ }
+
+ @Override
+ public int readValueDictionaryId() {
+ return readInteger();
+ }
+
+ @Override
+ public int readInteger() {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+
+ this.currentCount--;
+ switch (mode) {
+ case RLE:
+ return this.currentValue;
+ case PACKED:
+ return this.packedValuesBuffer[packedValuesBufferIdx++];
+ }
+ throw new RuntimeException("Unreachable");
+ }
+
+ public void readBatchOfDictionaryIds(
+ final IntVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, n);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.set(idx, dictionaryEncodedValuesReader.readInteger());
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
+ // check definition level.
+ private void readDictionaryIdsInternal(final IntVector c, final int numValsInVector, final int numValuesToRead) {
+ int left = numValuesToRead;
+ int idx = numValsInVector;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ c.set(idx, currentValue);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ c.set(idx, packedValuesBuffer[packedValuesBufferIdx]);
+ packedValuesBufferIdx++;
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfIntegers(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ maxDefLevel,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ n);
+ bufferIdx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfDictionaryEncodedIntegers(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedIntegersInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ ArrowBuf dataBuffer = vector.getDataBuffer();
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ dataBuffer.setInt(idx, dict.decodeToInt(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ dataBuffer.setInt(idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfLongs(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ maxDefLevel,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ n);
+ bufferIdx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfDictionaryEncodedLongs(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ ArrowBuf validityBuffer = vector.getValidityBuffer();
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, validityBuffer);
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, validityBuffer);
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedLongsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfFloats(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ maxDefLevel,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ n);
+ bufferIdx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void setValue(int typeWidth, BytesReader valuesReader, int bufferIdx, ArrowBuf validityBuffer, ArrowBuf dataBuffer) {
+ dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
+ BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx);
+ bufferIdx++;
+ }
+
+ public void readBatchOfDictionaryEncodedFloats(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ ArrowBuf validityBuffer = vector.getValidityBuffer();
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, validityBuffer);
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, validityBuffer);
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedFloatsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfDoubles(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ maxDefLevel,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ n);
+ bufferIdx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfDictionaryEncodedDoubles(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedDoublesInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfFixedWidthBinary(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+ bufferIdx++;
+ }
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfDictionaryEncodedFixedWidthBinary(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfFixedLengthDecimals(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+ bufferIdx++;
+ }
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfDictionaryEncodedFixedLengthDecimals(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
+ byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+ System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
+ byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
+ byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+ System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
+ byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
+ byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+ System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ /**
+ * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
+ * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
+ * vector. It appropriately sets the validity buffer in the Arrow vector.
+ */
+ public void readBatchVarWidth(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+ bufferIdx++;
+ }
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void setVarWidthBinaryValue(FieldVector vector, BytesReader valuesReader, int bufferIdx) {
+ int len = valuesReader.readInteger();
+ ByteBuffer buffer = valuesReader.getBuffer(len);
+ // Calling setValueLengthSafe takes care of allocating a larger buffer if
+ // running out of space.
+ ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
+ // It is possible that the data buffer was reallocated. So it is important to
+ // not cache the data buffer reference but instead use vector.getDataBuffer().
+ vector.getDataBuffer().writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
+ // Similarly, we need to get the latest reference to the validity buffer as well
+ // since reallocation changes reference of the validity buffers as well.
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+ }
+
+ public void readBatchOfDictionaryEncodedVarWidth(
+ final FieldVector vector, final int numValsInVector,
+ final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(FieldVector vector, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfIntLongBackedDecimals(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+ vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+ bufferIdx++;
+ }
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+ vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+ bufferIdx++;
+ //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+ } else {
+ //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+ nullabilityHolder.setNull(bufferIdx);
+ bufferIdx++;
+ }
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(valuesReader.readInteger()) : dict.decodeToLong(valuesReader.readInteger())));
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(FieldVector vector, final int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ ((DecimalVector) vector).set(idx, typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++])));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ public void readBatchOfBooleans(
+ final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+ bufferIdx++;
+ }
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void setBinaryInVector(VarBinaryVector vector, int typeWidth, BytesReader valuesReader, int bufferIdx) {
+ byte[] byteArray = new byte[typeWidth];
+ valuesReader.getBuffer(typeWidth).get(byteArray);
+ vector.setSafe(bufferIdx, byteArray);
+ }
+
+ private void setNextNValuesInVector(
+ int typeWidth, int maxDefLevel, NullabilityHolder nullabilityHolder,
+ BytesReader valuesReader, int bufferIdx, FieldVector vector, int n) {
+ ArrowBuf validityBuffer = vector.getValidityBuffer();
+ int validityBufferIdx = bufferIdx;
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+ validityBufferIdx++;
+ }
+ ByteBuffer buffer = valuesReader.getBuffer(n * typeWidth);
+ vector.getDataBuffer().setBytes(bufferIdx * typeWidth, buffer);
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, validityBuffer);
+ }
+ }
+
+ private void setNull(NullabilityHolder nullabilityHolder, int bufferIdx, ArrowBuf validityBuffer) {
+ nullabilityHolder.setNull(bufferIdx);
+ BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+ }
+
+ private void setNulls(NullabilityHolder nullabilityHolder, int bufferIdx, int n, ArrowBuf validityBuffer) {
+ for (int i = 0; i < n; i++) {
+ nullabilityHolder.setNull(bufferIdx);
+ BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+ bufferIdx++;
+ }
+ }
+
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
similarity index 56%
rename from parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
index dbffdb0..bbaddca 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/NullabilityHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
@@ -16,41 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.parquet;
-public class NullabilityHolder {
- private final boolean[] isNull;
- private int numNulls;
+package org.apache.iceberg.parquet.vectorized;
- public NullabilityHolder(int batchSize) {
- this.isNull = new boolean[batchSize];
- }
-
-
- public void setNull(int idx) {
- isNull[idx] = true;
- numNulls++;
- }
-
- public void setNulls(int idx, int num) {
- int i = 0;
- while (i < num) {
- isNull[idx] = true;
- numNulls++;
- idx++;
- i++;
- }
- }
-
- public boolean isNullAt(int idx) {
- return isNull[idx];
- }
-
- public boolean hasNulls() {
- return numNulls > 0;
- }
-
- public int numNulls() {
- return numNulls;
- }
+/**
+ * Marker interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader {
}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
index 1d4a4ed..f8fbe5e 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
@@ -11,7 +11,6 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.Map;
-import java.util.UUID;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.spark.sql.functions.*;
@@ -35,7 +34,7 @@ public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDicti
.withColumn("longCol",
when(pmod(col("id"), lit(9))
.equalTo(lit(0)), lit(0l))
- .when(expr("id > NUM_ROWS/2"), lit(UUID.randomUUID().toString()))
+ //.when(expr("id > NUM_ROWS/2"), lit(UUID.randomUUID().toString()))
.when(pmod(col("id"), lit(9))
.equalTo(lit(1)), lit(1l))
.when(pmod(col("id"), lit(9))
@@ -72,8 +71,8 @@ public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDicti
.when(col("longCol")
.equalTo(lit(8)), lit("8"))
.when(col("longCol")
- .equalTo(lit(9)), lit("9"))
- .otherwise(lit(UUID.randomUUID().toString())));
+ .equalTo(lit(9)), lit("9")));
+ //.otherwise(lit(UUID.randomUUID().toString())));
appendAsFile(df);
}
}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedFallbackToPlainEncodingStringsBenchmark.java
similarity index 66%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedFallbackToPlainEncodingStringsBenchmark.java
index 1d4a4ed..27e8ce2 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedDictionaryEncodedStringsBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedFallbackToPlainEncodingStringsBenchmark.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iceberg.spark.source.parquet.vectorized;
import com.google.common.collect.Maps;
@@ -16,7 +35,7 @@ import java.util.UUID;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.spark.sql.functions.*;
-public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDictionaryEncodedBenchmark {
+public class VectorizedFallbackToPlainEncodingStringsBenchmark extends VectorizedDictionaryEncodedBenchmark {
@Override
protected final Table initTable() {
Schema schema = new Schema(
@@ -33,48 +52,37 @@ public class VectorizedDictionaryEncodedStringsBenchmark extends VectorizedDicti
for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
Dataset<Row> df = spark().range(NUM_ROWS)
.withColumn("longCol",
- when(pmod(col("id"), lit(9))
- .equalTo(lit(0)), lit(0l))
- .when(expr("id > NUM_ROWS/2"), lit(UUID.randomUUID().toString()))
+ when(expr("id > 10000000/2"), lit(3l))
+ .when(pmod(col("id"), lit(9))
+ .equalTo(lit(0)), lit(1l))
.when(pmod(col("id"), lit(9))
.equalTo(lit(1)), lit(1l))
.when(pmod(col("id"), lit(9))
- .equalTo(lit(2)), lit(2l))
+ .equalTo(lit(2)), lit(1l))
.when(pmod(col("id"), lit(9))
- .equalTo(lit(3)), lit(3l))
+ .equalTo(lit(3)), lit(1l))
.when(pmod(col("id"), lit(9))
- .equalTo(lit(4)), lit(4l))
+ .equalTo(lit(4)), lit(1l))
.when(pmod(col("id"), lit(9))
- .equalTo(lit(5)), lit(5l))
+ .equalTo(lit(5)), lit(2l))
.when(pmod(col("id"), lit(9))
- .equalTo(lit(6)), lit(6l))
+ .equalTo(lit(6)), lit(2l))
.when(pmod(col("id"), lit(9))
- .equalTo(lit(7)), lit(7l))
+ .equalTo(lit(7)), lit(2l))
.when(pmod(col("id"), lit(9))
- .equalTo(lit(8)), lit(8l))
+ .equalTo(lit(8)), lit(2l))
.otherwise(lit(2l)))
.drop("id")
.withColumn("stringCol",
when(col("longCol")
- .equalTo(lit(1)), lit("1"))
- .when(col("longCol")
- .equalTo(lit(2)), lit("2"))
- .when(col("longCol")
- .equalTo(lit(3)), lit("3"))
+ .equalTo(lit(1l)), lit("1"))
.when(col("longCol")
- .equalTo(lit(4)), lit("4"))
+ .equalTo(lit(2l)), lit("2"))
.when(col("longCol")
- .equalTo(lit(5)), lit("5"))
- .when(col("longCol")
- .equalTo(lit(6)), lit("6"))
- .when(col("longCol")
- .equalTo(lit(7)), lit("7"))
- .when(col("longCol")
- .equalTo(lit(8)), lit("8"))
- .when(col("longCol")
- .equalTo(lit(9)), lit("9"))
+ .equalTo(lit(3l)), lit(UUID.randomUUID().toString()))
.otherwise(lit(UUID.randomUUID().toString())));
appendAsFile(df);
}
}
}
+
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
index aad8602..daee625 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
@@ -22,16 +22,13 @@ package org.apache.iceberg.spark.data.vector;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.ArrowSchemaUtil;
-import org.apache.iceberg.parquet.BatchedReader;
-import org.apache.iceberg.parquet.ColumnarBatchReader;
+import org.apache.iceberg.parquet.vectorized.VectorizedReader;
+import org.apache.iceberg.parquet.vectorized.ColumnarBatchReaders;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.parquet.VectorReader;
+import org.apache.iceberg.parquet.vectorized.VectorizedArrowReader;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.GroupType;
@@ -42,147 +39,134 @@ import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
public class VectorizedSparkParquetReaders {
- private static final Logger LOG = LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
-
- private VectorizedSparkParquetReaders() {
- }
-
- @SuppressWarnings("unchecked")
- public static ColumnarBatchReader buildReader(
- Schema tableSchema,
- Schema expectedSchema,
- MessageType fileSchema) {
-
- return buildReader(tableSchema, expectedSchema, fileSchema,
- VectorReader.DEFAULT_BATCH_SIZE);
- }
-
- @SuppressWarnings("unchecked")
- public static ColumnarBatchReader buildReader(
- Schema tableSchema,
- Schema expectedSchema,
- MessageType fileSchema,
- Integer recordsPerBatch) {
-
- LOG.info("=> [VectorizedSparkParquetReaders] recordsPerBatch = {}", recordsPerBatch);
- return (ColumnarBatchReader)
- TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new ReadBuilderBatched(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
- }
-
- private static class ReadBuilderBatched extends TypeWithSchemaVisitor<BatchedReader> {
- private final MessageType parquetSchema;
- private final Schema projectedIcebergSchema;
- private final Schema tableIcebergSchema;
- private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
- private final BufferAllocator rootAllocator;
- private final int recordsPerBatch;
-
- ReadBuilderBatched(
- Schema tableSchema,
- Schema projectedIcebergSchema,
- MessageType parquetSchema,
- int recordsPerBatch) {
- this.parquetSchema = parquetSchema;
- this.tableIcebergSchema = tableSchema;
- this.projectedIcebergSchema = projectedIcebergSchema;
- this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
- this.recordsPerBatch = recordsPerBatch;
- this.rootAllocator = ArrowUtils.rootAllocator().newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
- LOG.info("=> [ReadBuilder] recordsPerBatch = {}", this.recordsPerBatch);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
- @Override
- public BatchedReader message(
- Types.StructType expected, MessageType message,
- List<BatchedReader> fieldReaders) {
- return struct(expected, message.asGroupType(), fieldReaders);
+ private VectorizedSparkParquetReaders() {
}
- @Override
- public BatchedReader struct(
- Types.StructType expected, GroupType struct,
- List<BatchedReader> fieldReaders) {
-
- // this works on struct fields and the root iceberg schema which itself is a struct.
-
- // match the expected struct's order
- Map<Integer, BatchedReader> readersById = Maps.newHashMap();
- Map<Integer, Type> typesById = Maps.newHashMap();
- List<Type> fields = struct.getFields();
-
- for (int i = 0; i < fields.size(); i += 1) {
- Type fieldType = fields.get(i);
- int id = fieldType.getId().intValue();
- readersById.put(id, fieldReaders.get(i));
- typesById.put(id, fieldType);
- }
-
- List<Types.NestedField> icebergFields = expected != null ?
- expected.fields() : ImmutableList.of();
-
- List<BatchedReader> reorderedFields = Lists.newArrayListWithExpectedSize(
- icebergFields.size());
-
- List<Type> types = Lists.newArrayListWithExpectedSize(icebergFields.size());
-
- for (Types.NestedField field : icebergFields) {
- int id = field.fieldId();
- BatchedReader reader = readersById.get(id);
- if (reader != null) {
- reorderedFields.add(reader);
- types.add(typesById.get(id));
- } else {
- reorderedFields.add(null); // anjali-todo We need a NullVectorReader
- types.add(null);
- }
- }
-
- return new ColumnarBatchReader(types, expected, reorderedFields);
+ @SuppressWarnings("unchecked")
+ public static ColumnarBatchReaders buildReader(
+ Schema tableSchema,
+ Schema expectedSchema,
+ MessageType fileSchema) {
+ return buildReader(tableSchema, expectedSchema, fileSchema,
+ VectorizedArrowReader.DEFAULT_BATCH_SIZE);
}
- @Override
- public BatchedReader primitive(
- org.apache.iceberg.types.Type.PrimitiveType expected,
- PrimitiveType primitive) {
-
- // Create arrow vector for this field
- int parquetFieldId = primitive.getId().intValue();
- ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
- Types.NestedField icebergField = tableIcebergSchema.findField(parquetFieldId);
- return new VectorReader(desc, icebergField, rootAllocator, recordsPerBatch);
+ @SuppressWarnings("unchecked")
+ public static ColumnarBatchReaders buildReader(
+ Schema tableSchema,
+ Schema expectedSchema,
+ MessageType fileSchema,
+ Integer recordsPerBatch) {
+ LOG.info("=> [VectorizedSparkParquetReaders] recordsPerBatch = {}", recordsPerBatch);
+ return (ColumnarBatchReaders)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new VectorReaderBuilder(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
}
- private String[] currentPath() {
- String[] path = new String[fieldNames.size()];
- if (!fieldNames.isEmpty()) {
- Iterator<String> iter = fieldNames.descendingIterator();
- for (int i = 0; iter.hasNext(); i += 1) {
- path[i] = iter.next();
+ private static class VectorReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader> {
+ private final MessageType parquetSchema;
+ private final Schema projectedIcebergSchema;
+ private final Schema tableIcebergSchema;
+ private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
+ private final BufferAllocator rootAllocator;
+ private final int recordsPerBatch;
+
+ VectorReaderBuilder(
+ Schema tableSchema,
+ Schema projectedIcebergSchema,
+ MessageType parquetSchema,
+ int recordsPerBatch) {
+ this.parquetSchema = parquetSchema;
+ this.tableIcebergSchema = tableSchema;
+ this.projectedIcebergSchema = projectedIcebergSchema;
+ this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
+ this.recordsPerBatch = recordsPerBatch;
+ this.rootAllocator = ArrowUtils.rootAllocator().newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
+ LOG.info("=> [ReadBuilder] recordsPerBatch = {}", this.recordsPerBatch);
}
- }
- return path;
- }
+ @Override
+ public VectorizedReader message(
+ Types.StructType expected, MessageType message,
+ List<VectorizedReader> fieldReaders) {
+ return struct(expected, message.asGroupType(), fieldReaders);
+ }
- protected MessageType type() {
- return parquetSchema;
- }
+ @Override
+ public VectorizedReader struct(
+ Types.StructType expected, GroupType struct,
+ List<VectorizedReader> fieldReaders) {
+
+ // this works on struct fields and the root iceberg schema which itself is a struct.
+
+ // match the expected struct's order
+ Map<Integer, VectorizedReader> readersById = Maps.newHashMap();
+ Map<Integer, Type> typesById = Maps.newHashMap();
+ List<Type> fields = struct.getFields();
+
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int id = fieldType.getId().intValue();
+ readersById.put(id, fieldReaders.get(i));
+ typesById.put(id, fieldType);
+ }
+
+ List<Types.NestedField> icebergFields = expected != null ?
+ expected.fields() : ImmutableList.of();
+
+ List<VectorizedReader> reorderedFields = Lists.newArrayListWithExpectedSize(
+ icebergFields.size());
+
+ List<Type> types = Lists.newArrayListWithExpectedSize(icebergFields.size());
+
+ for (Types.NestedField field : icebergFields) {
+ int id = field.fieldId();
+ VectorizedReader reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(null); // anjali-todo We need a NullVectorReader
+ types.add(null);
+ }
+ }
+
+ return new ColumnarBatchReaders(types, expected, reorderedFields);
+ }
+
+ @Override
+ public VectorizedReader primitive(
+ org.apache.iceberg.types.Type.PrimitiveType expected,
+ PrimitiveType primitive) {
- protected String[] path(String name) {
- String[] path = new String[fieldNames.size() + 1];
- path[fieldNames.size()] = name;
+ // Create arrow vector for this field
+ int parquetFieldId = primitive.getId().intValue();
+ ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
+ Types.NestedField icebergField = tableIcebergSchema.findField(parquetFieldId);
+ return new VectorizedArrowReader(desc, icebergField, rootAllocator, recordsPerBatch);
+ }
- if (!fieldNames.isEmpty()) {
- Iterator<String> iter = fieldNames.descendingIterator();
- for (int i = 0; iter.hasNext(); i += 1) {
- path[i] = iter.next();
+ private String[] currentPath() {
+ String[] path = new String[fieldNames.size()];
+ if (!fieldNames.isEmpty()) {
+ Iterator<String> iter = fieldNames.descendingIterator();
+ for (int i = 0; iter.hasNext(); i += 1) {
+ path[i] = iter.next();
+ }
+ }
+ return path;
}
- }
- return path;
+ protected MessageType type() {
+ return parquetSchema;
+ }
}
- }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 5114db2..8ff0bb6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -55,7 +55,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.parquet.VectorReader;
+import org.apache.iceberg.parquet.vectorized.VectorizedArrowReader;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroReader;
@@ -127,7 +127,7 @@ class Reader implements DataSourceReader,
} else {
- this.numRecordsPerBatch = VectorReader.DEFAULT_BATCH_SIZE;
+ this.numRecordsPerBatch = VectorizedArrowReader.DEFAULT_BATCH_SIZE;
}
LOG.info("=> Set Config numRecordsPerBatch = {}", numRecordsPerBatch);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
index 70d176e..1c1d376 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
@@ -41,10 +41,10 @@ public abstract class AvroDataTest {
protected StructType getSupportedPrimitives() {
return StructType.of(
required(100, "id", LongType.get()),
- required(101, "data", Types.StringType.get()),
- required(102, "b", Types.BooleanType.get()),
+ optional(101, "data", Types.StringType.get()),
+ optional(102, "b", Types.BooleanType.get()),
optional(103, "i", Types.IntegerType.get()),
- required(104, "l", LongType.get()),
+ optional(104, "l", LongType.get()),
optional(105, "f", Types.FloatType.get()),
optional(106, "d", Types.DoubleType.get()),
optional(107, "date", Types.DateType.get()),
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/DictionaryData.java b/spark/src/test/java/org/apache/iceberg/spark/data/DictionaryData.java
new file mode 100644
index 0000000..74c8cda
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/DictionaryData.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Supplier;
+
+public class DictionaryData {
+
+ public static List<GenericData.Record> generateDictionaryEncodableData(Schema schema, int numRecords, long seed) {
+ List<GenericData.Record> records = Lists.newArrayListWithExpectedSize(numRecords);
+ DictionaryDataGenerator dictionaryDataGenerator = new DictionaryDataGenerator(schema, seed);
+ for (int i = 0; i < numRecords; i += 1) {
+ GenericData.Record rec = (GenericData.Record) TypeUtil.visit(schema, dictionaryDataGenerator);
+ records.add(rec);
+ }
+ return records;
+ }
+
+ private static class DictionaryDataGenerator extends TypeUtil.CustomOrderSchemaVisitor<Object> {
+ private final Map<Type, org.apache.avro.Schema> typeToSchema;
+ private final Random random;
+
+ private DictionaryDataGenerator(Schema schema, long seed) {
+ this.typeToSchema = AvroSchemaUtil.convertTypes(schema.asStruct(), "test");
+ this.random = new Random(seed);
+ }
+
+ @Override
+ public GenericData.Record schema(Schema schema, Supplier<Object> structResult) {
+ return (GenericData.Record) structResult.get();
+ }
+
+ @Override
+ public GenericData.Record struct(Types.StructType struct, Iterable<Object> fieldResults) {
+ GenericData.Record rec = new GenericData.Record(typeToSchema.get(struct));
+
+ List<Object> values = Lists.newArrayList(fieldResults);
+ for (int i = 0; i < values.size(); i += 1) {
+ rec.put(i, values.get(i));
+ }
+
+ return rec;
+ }
+
+ @Override
+ public Object field(Types.NestedField field, Supplier<Object> fieldResult) {
+ // return null 5% of the time when the value is optional
+ if (field.isOptional() && random.nextInt(20) == 1) {
+ return null;
+ }
+ return fieldResult.get();
+ }
+
+ @Override
+ public Object list(Types.ListType list, Supplier<Object> elementResult) {
+ int numElements = random.nextInt(20);
+
+ List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
+ for (int i = 0; i < numElements; i += 1) {
+ // return null 5% of the time when the value is optional
+ if (list.isElementOptional() && random.nextInt(20) == 1) {
+ result.add(null);
+ } else {
+ result.add(elementResult.get());
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public Object map(Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
+ int numEntries = random.nextInt(20);
+
+ Map<Object, Object> result = Maps.newLinkedHashMap();
+ Set<Object> keySet = Sets.newHashSet();
+ for (int i = 0; i < numEntries; i += 1) {
+ Object key = keyResult.get();
+ // ensure no collisions
+ while (keySet.contains(key)) {
+ key = keyResult.get();
+ }
+
+ keySet.add(key);
+
+ // return null 5% of the time when the value is optional
+ if (map.isValueOptional() && random.nextInt(20) == 1) {
+ result.put(key, null);
+ } else {
+ result.put(key, valueResult.get());
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public Object primitive(Type.PrimitiveType primitive) {
+ Object result = generatePrimitive(primitive, random);
+ // For the primitives that Avro needs a different type than Spark, fix
+ // them here.
+ switch (primitive.typeId()) {
+ case STRING:
+ return ((UTF8String) result).toString();
+ case FIXED:
+ return new GenericData.Fixed(typeToSchema.get(primitive),
+ (byte[]) result);
+ case BINARY:
+ return ByteBuffer.wrap((byte[]) result);
+ case UUID:
+ return UUID.nameUUIDFromBytes((byte[]) result);
+ case DECIMAL:
+ return ((Decimal) result).toJavaBigDecimal();
+ default:
+ return result;
+ }
+ }
+ }
+
+ private static Object generatePrimitive(Type.PrimitiveType primitive,
+ Random random) {
+ // 3 choices
+ int choice = random.nextInt(3);
+ switch (primitive.typeId()) {
+ case BOOLEAN:
+ return true; // doesn't really matter for booleans since they are not dictionary encoded
+
+ case INTEGER:
+ switch (choice) {
+ case 0:
+ return 0;
+ case 1:
+ return 1;
+ case 2:
+ return 2;
+ }
+
+ case LONG:
+ switch (choice) {
+ case 0:
+ return 0L;
+ case 1:
+ return 1L;
+ case 2:
+ return 2L;
+ }
+
+ case FLOAT:
+ switch (choice) {
+ case 0:
+ return 0.0f;
+ case 1:
+ return 1.0f;
+ case 2:
+ return 2.0f;
+ }
+
+ case DOUBLE:
+ switch (choice) {
+ case 0:
+ return 0.0d;
+ case 1:
+ return 1.0d;
+ case 2:
+ return 2.0d;
+ }
+
+ case DATE:
+ switch (choice) {
+ case 0:
+ return 0;
+ case 1:
+ return 1;
+ case 2:
+ return 2;
+ }
+
+ case TIME:
+ switch (choice) {
+ case 0:
+ return 0l;
+ case 1:
+ return 1l;
+ case 2:
+ return 2l;
+ }
+
+ case TIMESTAMP:
+ switch (choice) {
+ case 0:
+ return 0l;
+ case 1:
+ return 1l;
+ case 2:
+ return 2l;
+ }
+
+ case STRING:
+ switch (choice) {
+ case 0:
+ return UTF8String.fromString("0");
+ case 1:
+ return UTF8String.fromString("1");
+ case 2:
+ return UTF8String.fromString("2");
+ }
+
+ case FIXED:
+ byte[] fixed = new byte[((Types.FixedType) primitive).length()];
+ switch (choice) {
+ case 0:
+ fixed[0] = 0;
+ return fixed;
+ case 1:
+ fixed[0] = 1;
+ return fixed;
+ case 2:
+ fixed[0] = 2;
+ return fixed;
+ }
+
+ case BINARY:
+ byte[] binary = new byte[4];
+ switch (choice) {
+ case 0:
+ binary[0] = 0;
+ return binary;
+ case 1:
+ binary[0] = 1;
+ return binary;
+ case 2:
+ binary[0] = 2;
+ return binary;
+ }
+
+ case DECIMAL:
+ Types.DecimalType type = (Types.DecimalType) primitive;
+ switch (choice) {
+ case 0:
+ BigInteger unscaled = new BigInteger("1");
+ return Decimal.apply(new BigDecimal(unscaled, type.scale()));
+ case 1:
+ unscaled = new BigInteger("2");
+ return Decimal.apply(new BigDecimal(unscaled, type.scale()));
+ case 2:
+ unscaled = new BigInteger("3");
+ return Decimal.apply(new BigDecimal(unscaled, type.scale()));
+ }
+
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot generate random value for unknown type: " + primitive);
+ }
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 928ec8a..35bb5c7 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -33,9 +33,12 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+
+import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.serde2.io.DateWritable;
@@ -53,6 +56,7 @@ import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
@@ -218,11 +222,6 @@ public class TestHelpers {
Type fieldType = fields.get(i).type();
Object expectedValue = expRec.get(i);
- // System.out.println(" -> Checking Row "+r+", field #"+i
- // + " , Field:"+ fields.get(i).name()
- // + " , optional:"+fields.get(i).isOptional()
- // + " , type:"+fieldType.typeId()
- // + " , expected:"+expectedValue);
if (actualRow.isNullAt(i)) {
Assert.assertTrue("Expect null at " + r, expectedValue == null);
} else {
@@ -233,6 +232,30 @@ public class TestHelpers {
}
}
+ public static void assertArrowVectors(Types.StructType struct, List<Record> expected, ColumnarBatch batch) {
+ List<Types.NestedField> fields = struct.fields();
+ for (int r = 0; r < batch.numRows(); r++) {
+ Record expRec = expected.get(r);
+ InternalRow actualRow = batch.getRow(r);
+ for (int i = 0; i < fields.size(); i += 1) {
+ ColumnVector vector = batch.column(i);
+ Assert.assertTrue(vector instanceof IcebergArrowColumnVector);
+ IcebergArrowColumnVector.ArrowVectorAccessor accessor = ((IcebergArrowColumnVector) vector).getAccessor();
+ ValueVector arrowVector = accessor.getUnderlyingArrowVector();
+ Type fieldType = fields.get(i).type();
+ Object expectedValue = expRec.get(i);
+ if (actualRow.isNullAt(i)) {
+ Assert.assertTrue("Expect null at " + r, expectedValue == null);
+ Assert.assertTrue("Expected the value to be set as null in the arrow vector", arrowVector.isNull(r));
+ } else {
+ Object actualValue = actualRow.get(i, convert(fieldType));
+ Assert.assertFalse("Expected the value to be set as non-null in the arrow vector", arrowVector.isNull(r));
+ assertEqualsUnsafe(fieldType, expectedValue, actualValue);
+ }
+ }
+ }
+ }
+
private static void assertEqualsUnsafe(Types.ListType list, Collection<?> expected, ArrayData actual) {
Type elementType = list.elementType();
List<?> expectedElements = Lists.newArrayList(expected);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetDictionaryEncodedVectorizedReader.java
similarity index 67%
copy from spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
copy to spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetDictionaryEncodedVectorizedReader.java
index 410d5e3..e494b98 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetDictionaryEncodedVectorizedReader.java
@@ -14,25 +14,15 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-public class TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader extends TestSparkParquetVectorizedReader {
-
- @Override
- protected Types.StructType getSupportedPrimitives() {
- return Types.StructType.of(
- required(100, "id", Types.LongType.get()),
- required(101, "data", Types.StringType.get()));
- }
+public class TestSparkParquetDictionaryEncodedVectorizedReader extends TestSparkParquetVectorizedReader {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
- setupArrowFlags();
// Write test data
Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
- List<GenericData.Record> expected = RandomData.generateListWithFallBackDictionaryEncodingForStrings(schema, 1000000, 0L, 0.5f);
+ List<GenericData.Record> expected = DictionaryData.generateDictionaryEncodableData(schema, 100000, 0L);
// write a test parquet file using iceberg writer
File testFile = temp.newFile();
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
index 410d5e3..fdb6f4d 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader.java
@@ -27,7 +27,6 @@ public class TestSparkParquetFallbackToDictionaryEncodingForVectorizedReader ext
@Override
protected void writeAndValidate(Schema schema) throws IOException {
- setupArrowFlags();
// Write test data
Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
index a1697e4..b14a55f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
@@ -36,16 +36,23 @@ import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.junit.Assert;
import org.junit.Assume;
+import org.junit.Before;
import org.junit.Test;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
+import static org.apache.iceberg.spark.data.TestHelpers.assertArrowVectors;
import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
public class TestSparkParquetVectorizedReader extends AvroDataTest {
+ @Before
+ public void setupArrowFlags() {
+ System.setProperty("arrow.enable_unsafe_memory_access", "true");
+ System.setProperty("arrow.enable_null_check_for_get", "false");
+ }
+
@Override
protected void writeAndValidate(Schema schema) throws IOException {
- setupArrowFlags();
// Write test data
Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
@@ -83,18 +90,10 @@ public class TestSparkParquetVectorizedReader extends AvroDataTest {
for (int i = numExpectedRead; i < numExpectedRead + batch.numRows(); i++) {
expectedBatch.add(expected.get(i));
}
-
- // System.out.println("-> Check "+numExpectedRead+" - "+ (numExpectedRead+batch.numRows()));
- assertEqualsUnsafe(schema.asStruct(), expectedBatch, batch);
-
- System.out.println("Batch read with " + batch.numRows() + " rows. Read " + numRowsRead + " till now. " +
- "Expected batch " + expectedBatch.size());
-
+ assertArrowVectors(schema.asStruct(), expectedBatch, batch);
numExpectedRead += batch.numRows();
}
-
Assert.assertEquals(expected.size(), numRowsRead);
-
}
}
@@ -133,8 +132,4 @@ public class TestSparkParquetVectorizedReader extends AvroDataTest {
System.out.println("Not Supported");
}
- void setupArrowFlags() {
- System.setProperty("arrow.enable_unsafe_memory_access", "true");
- System.setProperty("arrow.enable_null_check_for_get", "false");
- }
}