You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/22 00:36:48 UTC
[incubator-iceberg] branch vectorized-read updated: Fix various
checkstyle issues (#667)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/vectorized-read by this push:
new ecc9c8d Fix various checkstyle issues (#667)
ecc9c8d is described below
commit ecc9c8dccd0d1f0c86fa8b1814b168ed6a480050
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Thu Nov 21 16:36:38 2019 -0800
Fix various checkstyle issues (#667)
---
build.gradle | 10 +-
gradle/wrapper/gradle-wrapper.properties | 24 +-
.../org/apache/iceberg/parquet/ParquetUtil.java | 270 ++-
.../{BytesReader.java => ValuesAsBytesReader.java} | 48 +-
.../iceberg/parquet/arrow/ArrowSchemaUtil.java | 147 ++
.../arrow/IcebergArrowColumnVector.java | 100 +-
.../arrow/IcebergDecimalArrowVector.java | 8 +-
.../arrow/IcebergVarBinaryArrowVector.java | 6 +-
.../arrow/IcebergVarcharArrowVector.java | 8 +-
.../parquet/vectorized/ColumnarBatchReaders.java | 112 +-
.../parquet/vectorized/NullabilityHolder.java | 16 +-
.../iceberg/parquet/vectorized/VectorHolder.java | 61 +-
.../parquet/vectorized/VectorizedArrowReader.java | 56 +-
.../vectorized/VectorizedColumnIterator.java | 54 +-
.../parquet/vectorized/VectorizedPageIterator.java | 856 +++----
.../vectorized/VectorizedParquetValuesReader.java | 2334 +++++++++++---------
.../org/apache/iceberg/spark/data/TestHelpers.java | 11 +-
versions.props | 1 +
18 files changed, 2275 insertions(+), 1847 deletions(-)
diff --git a/build.gradle b/build.gradle
index 8556cf3..9071a61 100644
--- a/build.gradle
+++ b/build.gradle
@@ -252,6 +252,12 @@ project(':iceberg-core') {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
+ compile("org.apache.arrow:arrow-vector") {
+ exclude group: 'io.netty', module: 'netty-buffer'
+ exclude group: 'io.netty', module: 'netty-common'
+ }
+
+ testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
}
}
@@ -264,7 +270,6 @@ project(':iceberg-data') {
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
}
-
testCompile("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
@@ -346,6 +351,9 @@ project(':iceberg-parquet') {
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
}
+ compileOnly("org.apache.spark:spark-hive_2.11") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ }
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
}
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 1277f7d..768a708 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,24 +1,6 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
+#Wed Nov 20 14:10:01 PST 2019
+distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip
+zipStoreBase=GRADLE_USER_HOME
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 4cab3d9..dbfeb41 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -22,6 +22,15 @@ package org.apache.iceberg.parquet;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
@@ -33,8 +42,9 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.UnicodeUtil;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.statistics.Statistics;
@@ -47,18 +57,6 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import static org.apache.iceberg.parquet.ParquetConversions.fromParquetPrimitive;
-import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMax;
-import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMin;
-import static org.apache.iceberg.util.UnicodeUtil.truncateStringMax;
-import static org.apache.iceberg.util.UnicodeUtil.truncateStringMin;
-import static org.apache.parquet.schema.OriginalType.*;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
-
public class ParquetUtil {
// not meant to be instantiated
private ParquetUtil() {
@@ -98,7 +96,7 @@ public class ParquetUtil {
increment(columnSizes, fieldId, column.getTotalSize());
String columnName = fileSchema.findColumnName(fieldId);
- MetricsMode metricsMode = metricsConfig.columnMode(columnName);
+ MetricsModes.MetricsMode metricsMode = metricsConfig.columnMode(columnName);
if (metricsMode == MetricsModes.None.get()) {
continue;
}
@@ -113,9 +111,9 @@ public class ParquetUtil {
if (metricsMode != MetricsModes.Counts.get()) {
Types.NestedField field = fileSchema.findField(fieldId);
if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) {
- Literal<?> min = fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin());
+ Literal<?> min = ParquetConversions.fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin());
updateMin(lowerBounds, fieldId, field.type(), min, metricsMode);
- Literal<?> max = fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax());
+ Literal<?> max = ParquetConversions.fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax());
updateMax(upperBounds, fieldId, field.type(), max, metricsMode);
}
}
@@ -134,10 +132,8 @@ public class ParquetUtil {
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
}
-
/**
- * @return a list of offsets in ascending order determined by the starting position
- * of the row groups
+ * @return a list of offsets in ascending order determined by the starting position of the row groups
*/
public static List<Long> getSplitOffsets(ParquetMetadata md) {
List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
@@ -175,8 +171,9 @@ public class ParquetUtil {
}
@SuppressWarnings("unchecked")
- private static <T> void updateMin(Map<Integer, Literal<?>> lowerBounds, int id, Type type,
- Literal<T> min, MetricsMode metricsMode) {
+ private static <T> void updateMin(
+ Map<Integer, Literal<?>> lowerBounds, int id, Type type,
+ Literal<T> min, MetricsMode metricsMode) {
Literal<T> currentMin = (Literal<T>) lowerBounds.get(id);
if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) {
if (metricsMode == MetricsModes.Full.get()) {
@@ -186,11 +183,11 @@ public class ParquetUtil {
int truncateLength = truncateMode.length();
switch (type.typeId()) {
case STRING:
- lowerBounds.put(id, truncateStringMin((Literal<CharSequence>) min, truncateLength));
+ lowerBounds.put(id, UnicodeUtil.truncateStringMin((Literal<CharSequence>) min, truncateLength));
break;
case FIXED:
case BINARY:
- lowerBounds.put(id, truncateBinaryMin((Literal<ByteBuffer>) min, truncateLength));
+ lowerBounds.put(id, BinaryUtil.truncateBinaryMin((Literal<ByteBuffer>) min, truncateLength));
break;
default:
lowerBounds.put(id, min);
@@ -200,8 +197,9 @@ public class ParquetUtil {
}
@SuppressWarnings("unchecked")
- private static <T> void updateMax(Map<Integer, Literal<?>> upperBounds, int id, Type type,
- Literal<T> max, MetricsMode metricsMode) {
+ private static <T> void updateMax(
+ Map<Integer, Literal<?>> upperBounds, int id, Type type,
+ Literal<T> max, MetricsMode metricsMode) {
Literal<T> currentMax = (Literal<T>) upperBounds.get(id);
if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) {
if (metricsMode == MetricsModes.Full.get()) {
@@ -211,11 +209,11 @@ public class ParquetUtil {
int truncateLength = truncateMode.length();
switch (type.typeId()) {
case STRING:
- upperBounds.put(id, truncateStringMax((Literal<CharSequence>) max, truncateLength));
+ upperBounds.put(id, UnicodeUtil.truncateStringMax((Literal<CharSequence>) max, truncateLength));
break;
case FIXED:
case BINARY:
- upperBounds.put(id, truncateBinaryMax((Literal<ByteBuffer>) max, truncateLength));
+ upperBounds.put(id, BinaryUtil.truncateBinaryMax((Literal<ByteBuffer>) max, truncateLength));
break;
default:
upperBounds.put(id, max);
@@ -227,148 +225,132 @@ public class ParquetUtil {
private static Map<Integer, ByteBuffer> toBufferMap(Schema schema, Map<Integer, Literal<?>> map) {
Map<Integer, ByteBuffer> bufferMap = Maps.newHashMap();
for (Map.Entry<Integer, Literal<?>> entry : map.entrySet()) {
- bufferMap.put(entry.getKey(),
+ bufferMap.put(
+ entry.getKey(),
Conversions.toByteBuffer(schema.findType(entry.getKey()), entry.getValue().value()));
}
return bufferMap;
}
- public static boolean isFixedLengthDecimal(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- return (primitive.getOriginalType() != null &&
- primitive.getOriginalType() == DECIMAL &&
- (primitive.getPrimitiveTypeName() == FIXED_LEN_BYTE_ARRAY ||
- primitive.getPrimitiveTypeName() == BINARY));
- }
-
- public static boolean isIntLongBackedDecimal(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- return (primitive.getOriginalType() != null &&
- primitive.getOriginalType() == DECIMAL &&
- (primitive.getPrimitiveTypeName() == INT64 ||
- primitive.getPrimitiveTypeName() == INT32));
- }
-
- public static boolean isVarWidthType(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- if (originalType != null &&
- originalType != DECIMAL &&
- (originalType == ENUM ||
- originalType == OriginalType.JSON ||
- originalType == UTF8 ||
- originalType == BSON)) {
- return true;
+ public static boolean isFixedLengthDecimal(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ return primitive.getOriginalType() != null &&
+ primitive.getOriginalType() == OriginalType.DECIMAL &&
+ (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ||
+ primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY);
}
- if (originalType == null && primitive.getPrimitiveTypeName() == BINARY) {
- return true;
- }
- return false;
- }
- public static boolean isBooleanType(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- return (originalType == null && primitive.getPrimitiveTypeName() == BOOLEAN);
- }
+ public static boolean isIntLongBackedDecimal(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ return primitive.getOriginalType() != null &&
+ primitive.getOriginalType() == OriginalType.DECIMAL &&
+ (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64 ||
+ primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32);
+ }
- public static boolean isFixedWidthBinary(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- if (originalType == null && primitive.getPrimitiveTypeName() == FIXED_LEN_BYTE_ARRAY) {
- return true;
+ public static boolean isVarWidthType(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ OriginalType originalType = primitive.getOriginalType();
+ if (originalType != null &&
+ originalType != OriginalType.DECIMAL &&
+ (originalType == OriginalType.ENUM ||
+ originalType == OriginalType.JSON ||
+ originalType == OriginalType.UTF8 ||
+ originalType == OriginalType.BSON)) {
+ return true;
+ }
+ if (originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) {
+ return true;
+ }
+ return false;
}
- return false;
- }
- public static boolean isNumericNonDecimalType(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- if (originalType != null) {
- if (originalType == INT_8 || originalType == INT_16 || originalType == INT_32
- || originalType == DATE || originalType == INT_64 || originalType == TIMESTAMP_MILLIS
- || originalType == TIMESTAMP_MICROS) {
- return true;
- }
- } else {
- PrimitiveType.PrimitiveTypeName primitiveTypeName = primitive.getPrimitiveTypeName();
- if (primitiveTypeName == INT64 || primitiveTypeName == INT32 || primitiveTypeName == FLOAT ||
- primitiveTypeName == DOUBLE) {
- return true;
- }
+ public static boolean isBooleanType(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ OriginalType originalType = primitive.getOriginalType();
+ return originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BOOLEAN;
}
- return false;
- }
- public static boolean isIntType(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- if (originalType != null && (originalType == INT_8 || originalType == INT_16 || originalType == INT_32 || originalType == DATE)) {
- return true;
- } else if (primitive.getPrimitiveTypeName() == INT32) {
- return true;
+ public static boolean isFixedWidthBinary(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ OriginalType originalType = primitive.getOriginalType();
+ if (originalType == null &&
+ primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ return true;
+ }
+ return false;
}
- return false;
- }
- public static boolean isLongType(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- if (originalType != null && (originalType == INT_64 || originalType == TIMESTAMP_MILLIS || originalType == TIMESTAMP_MICROS)) {
- return true;
- } else if (primitive.getPrimitiveTypeName() == INT64) {
- return true;
+ public static boolean isIntType(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ OriginalType originalType = primitive.getOriginalType();
+ if (originalType != null && (originalType == OriginalType.INT_8 || originalType == OriginalType.INT_16 ||
+ originalType == OriginalType.INT_32 || originalType == OriginalType.DATE)) {
+ return true;
+ } else if (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) {
+ return true;
+ }
+ return false;
}
- return false;
- }
- public static boolean isDoubleType(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- if (originalType == null && primitive.getPrimitiveTypeName() == DOUBLE) {
- return true;
+ public static boolean isLongType(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ OriginalType originalType = primitive.getOriginalType();
+ if (originalType != null && (originalType == OriginalType.INT_64 || originalType == OriginalType.TIMESTAMP_MILLIS ||
+ originalType == OriginalType.TIMESTAMP_MICROS)) {
+ return true;
+ } else if (primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) {
+ return true;
+ }
+ return false;
}
- return false;
- }
- public static boolean isFloatType(ColumnDescriptor desc) {
- PrimitiveType primitive = desc.getPrimitiveType();
- OriginalType originalType = primitive.getOriginalType();
- if (originalType == null && primitive.getPrimitiveTypeName() == FLOAT) {
- return true;
+ public static boolean isDoubleType(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ OriginalType originalType = primitive.getOriginalType();
+ if (originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
+ return true;
+ }
+ return false;
}
- return false;
- }
- @SuppressWarnings("deprecation")
- public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
- EncodingStats stats = meta.getEncodingStats();
- if (stats != null) {
- return stats.hasNonDictionaryEncodedPages();
+ public static boolean isFloatType(ColumnDescriptor desc) {
+ PrimitiveType primitive = desc.getPrimitiveType();
+ OriginalType originalType = primitive.getOriginalType();
+ if (originalType == null && primitive.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+ return true;
+ }
+ return false;
}
- // without EncodingStats, fall back to testing the encoding list
- Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
- if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
- // if remove returned true, PLAIN_DICTIONARY was present, which means at
- // least one page was dictionary encoded and 1.0 encodings are used
+ @SuppressWarnings("deprecation")
+ public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
+ EncodingStats stats = meta.getEncodingStats();
+ if (stats != null) {
+ return stats.hasNonDictionaryEncodedPages();
+ }
- // RLE and BIT_PACKED are only used for repetition or definition levels
- encodings.remove(Encoding.RLE);
- encodings.remove(Encoding.BIT_PACKED);
+ // without EncodingStats, fall back to testing the encoding list
+ Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
+ if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
+ // if remove returned true, PLAIN_DICTIONARY was present, which means at
+ // least one page was dictionary encoded and 1.0 encodings are used
- if (encodings.isEmpty()) {
- return false; // no encodings other than dictionary or rep/def levels
- }
+ // RLE and BIT_PACKED are only used for repetition or definition levels
+ encodings.remove(Encoding.RLE);
+ encodings.remove(Encoding.BIT_PACKED);
- return true;
+ if (encodings.isEmpty()) {
+ return false; // no encodings other than dictionary or rep/def levels
+ }
- } else {
- // if PLAIN_DICTIONARY wasn't present, then either the column is not
- // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
- // for 2.0, this cannot determine whether a page fell back without
- // page encoding stats
- return true;
+ return true;
+ } else {
+ // if PLAIN_DICTIONARY wasn't present, then either the column is not
+ // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
+ // for 2.0, this cannot determine whether a page fell back without
+ // page encoding stats
+ return true;
+ }
}
- }
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
similarity index 50%
rename from parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
index 6bc4756..6100979 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BytesReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java
@@ -1,18 +1,20 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.iceberg.parquet;
@@ -20,27 +22,26 @@ package org.apache.iceberg.parquet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import org.apache.arrow.vector.BigIntVector;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
/**
- * Implements a {@link ValuesReader} specifically to read given number of bytes from the underlying
- * {@link ByteBufferInputStream}.
+ * Implements a {@link ValuesReader} specifically to read given number of bytes from the underlying {@link
+ * ByteBufferInputStream}.
*/
-public class BytesReader extends ValuesReader {
- private ByteBufferInputStream in = null;
+public class ValuesAsBytesReader extends ValuesReader {
+ private ByteBufferInputStream valuesInputStream = null;
// Only used for booleans.
private int bitOffset;
private byte currentByte = 0;
- public BytesReader() {
+ public ValuesAsBytesReader() {
}
@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) {
- this.in = in;
+ this.valuesInputStream = in;
}
@Override
@@ -50,7 +51,7 @@ public class BytesReader extends ValuesReader {
public ByteBuffer getBuffer(int length) {
try {
- return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ return valuesInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
}
@@ -67,21 +68,20 @@ public class BytesReader extends ValuesReader {
currentByte = getByte();
}
- boolean v = (currentByte & (1 << bitOffset)) != 0;
+ boolean value = (currentByte & (1 << bitOffset)) != 0;
bitOffset += 1;
if (bitOffset == 8) {
bitOffset = 0;
}
- return v;
+ return value;
}
private byte getByte() {
try {
- return (byte) in.read();
+ return (byte) valuesInputStream.read();
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read a byte", e);
}
}
-
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/arrow/ArrowSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/ArrowSchemaUtil.java
new file mode 100644
index 0000000..1e7c4d4
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/ArrowSchemaUtil.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.arrow;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+
+public class ArrowSchemaUtil {
+ static final String ORIGINAL_TYPE = "originalType";
+ static final String MAP_TYPE = "mapType";
+ static final String MAP_KEY = "key";
+ static final String MAP_VALUE = "value";
+
+ private ArrowSchemaUtil() { }
+
+ /**
+ * Convert Iceberg schema to Arrow Schema.
+ *
+ * @param schema iceberg schema
+ * @return arrow schema
+ */
+ public static Schema convert(final org.apache.iceberg.Schema schema) {
+ final ImmutableList.Builder<Field> fields = ImmutableList.builder();
+
+ for (NestedField f : schema.columns()) {
+ fields.add(convert(f));
+ }
+
+ return new Schema(fields.build());
+ }
+
+ public static Field convert(final NestedField field) {
+ final ArrowType arrowType;
+
+ final List<Field> children = Lists.newArrayList();
+ Map<String, String> metadata = null;
+
+ switch (field.type().typeId()) {
+ case BINARY:
+ // Spark doesn't support BYTE(fixed_size) type, so cast it to VarBinary
+ case FIXED:
+ arrowType = ArrowType.Binary.INSTANCE;
+ break;
+ case BOOLEAN:
+ arrowType = ArrowType.Bool.INSTANCE;
+ break;
+ case INTEGER:
+ arrowType = new ArrowType.Int(Integer.SIZE, true);
+ break;
+ case LONG:
+ arrowType = new ArrowType.Int(Long.SIZE, true);
+ break;
+ case FLOAT:
+ arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ break;
+ case DOUBLE:
+ arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ break;
+ case DECIMAL:
+ final Types.DecimalType decimalType = (Types.DecimalType) field.type();
+ arrowType = new ArrowType.Decimal(decimalType.precision(), decimalType.scale());
+ break;
+ case STRING:
+ arrowType = ArrowType.Utf8.INSTANCE;
+ break;
+ case TIME:
+ arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
+ break;
+ case TIMESTAMP:
+ arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
+ break;
+ case DATE:
+ arrowType = new ArrowType.Date(DateUnit.DAY);
+ break;
+ case STRUCT:
+ final StructType struct = field.type().asStructType();
+ arrowType = ArrowType.Struct.INSTANCE;
+
+ for (NestedField nested : struct.fields()) {
+ children.add(convert(nested));
+ }
+ break;
+ case LIST:
+ final ListType listType = field.type().asListType();
+ arrowType = ArrowType.List.INSTANCE;
+
+ for (NestedField nested : listType.fields()) {
+ children.add(convert(nested));
+ }
+ break;
+ case MAP:
+ //Maps are represented as List<Struct<key, value>>
+ metadata = ImmutableMap.of(ORIGINAL_TYPE, MAP_TYPE);
+ final MapType mapType = field.type().asMapType();
+ arrowType = ArrowType.List.INSTANCE;
+
+ final List<Field> entryFields = Lists.newArrayList(
+ convert(required(0, MAP_KEY, mapType.keyType())),
+ convert(optional(0, MAP_VALUE, mapType.valueType()))
+ );
+
+ final Field entry = new Field("",
+ new FieldType(true, new ArrowType.Struct(), null), entryFields);
+ children.add(entry);
+ break;
+ default: throw new UnsupportedOperationException("Unsupported field type: " + field);
+ }
+
+ return new Field(field.name(), new FieldType(field.isOptional(), arrowType, null, metadata), children);
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
similarity index 87%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
index 7b30948..214a9a4 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
@@ -17,11 +17,22 @@
* under the License.
*/
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
-import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.*;
+import java.math.BigInteger;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
@@ -41,12 +52,9 @@ import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
-import java.math.BigInteger;
-
/**
- * Implementation of Spark's {@link ColumnVector} interface. The main purpose
- * of this class is to prevent the expensive nullability checks made by Spark's
- * {@link ArrowColumnVector} implementation by delegating those calls to the
+ * Implementation of Spark's {@link ColumnVector} interface. The main purpose of this class is to prevent the expensive
+ * nullability checks made by Spark's {@link ArrowColumnVector} implementation by delegating those calls to the
* Iceberg's {@link NullabilityHolder}.
*/
@@ -68,7 +76,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
this.accessor = getVectorAccessor(columnDescriptor, holder.getVector());
}
- @VisibleForTesting
+ // public for testing purposes only
public ArrowVectorAccessor getAccessor() {
return accessor;
}
@@ -173,9 +181,11 @@ public class IcebergArrowColumnVector extends ColumnVector {
}
@Override
- public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
+ public ArrowColumnVector getChild(int ordinal) {
+ return childColumns[ordinal];
+ }
- @VisibleForTesting
+ // public for testing purposes only
public abstract class ArrowVectorAccessor {
private final ValueVector vector;
@@ -237,12 +247,13 @@ public class IcebergArrowColumnVector extends ColumnVector {
throw new UnsupportedOperationException();
}
- @VisibleForTesting
+ // public for testing purposes only
public ValueVector getUnderlyingArrowVector() {
return vector;
}
}
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
private ArrowVectorAccessor getVectorAccessor(ColumnDescriptor desc, ValueVector vector) {
PrimitiveType primitive = desc.getPrimitiveType();
if (isVectorDictEncoded) {
@@ -268,18 +279,27 @@ public class IcebergArrowColumnVector extends ColumnVector {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
- return new DictionaryDecimalBinaryAccessor((IntVector) vector, decimal.getPrecision(), decimal.getScale());
+ return new DictionaryDecimalBinaryAccessor(
+ (IntVector) vector,
+ decimal.getPrecision(),
+ decimal.getScale());
case INT64:
- return new DictionaryDecimalLongAccessor((IntVector) vector, decimal.getPrecision(), decimal.getScale());
+ return new DictionaryDecimalLongAccessor(
+ (IntVector) vector,
+ decimal.getPrecision(),
+ decimal.getScale());
case INT32:
- return new DictionaryDecimalIntAccessor((IntVector) vector, decimal.getPrecision(), decimal.getScale());
+ return new DictionaryDecimalIntAccessor(
+ (IntVector) vector,
+ decimal.getPrecision(),
+ decimal.getScale());
default:
throw new UnsupportedOperationException(
- "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+ "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
default:
throw new UnsupportedOperationException(
- "Unsupported logical type: " + primitive.getOriginalType());
+ "Unsupported logical type: " + primitive.getOriginalType());
}
} else {
switch (primitive.getPrimitiveTypeName()) {
@@ -290,10 +310,10 @@ public class IcebergArrowColumnVector extends ColumnVector {
return new DictionaryIntAccessor((IntVector) vector);
case FLOAT:
return new DictionaryFloatAccessor((IntVector) vector);
-// case BOOLEAN:
-// this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
-// ((BitVector) vec).allocateNew(batchSize);
-// return UNKNOWN_WIDTH;
+ // case BOOLEAN:
+ // this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+ // ((BitVector) vec).allocateNew(batchSize);
+ // return UNKNOWN_WIDTH;
case INT64:
return new DictionaryLongAccessor((IntVector) vector);
case DOUBLE:
@@ -332,12 +352,12 @@ public class IcebergArrowColumnVector extends ColumnVector {
return new ArrayAccessor(listVector);
} else if (vector instanceof StructVector) {
StructVector structVector = (StructVector) vector;
- ArrowVectorAccessor accessor = new StructAccessor(structVector);
+ ArrowVectorAccessor structAccessor = new StructAccessor(structVector);
childColumns = new ArrowColumnVector[structVector.size()];
for (int i = 0; i < childColumns.length; ++i) {
childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
}
- return accessor;
+ return structAccessor;
}
}
throw new UnsupportedOperationException();
@@ -519,7 +539,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
- if (isNullAt(rowId)) return null;
+ if (isNullAt(rowId)) {
+ return null;
+ }
return Decimal.apply(vector.getObject(rowId), precision, scale);
}
}
@@ -540,7 +562,8 @@ public class IcebergArrowColumnVector extends ColumnVector {
if (stringResult.isSet == 0) {
return null;
} else {
- return UTF8String.fromAddress(null,
+ return UTF8String.fromAddress(
+ null,
stringResult.buffer.memoryAddress() + stringResult.start,
stringResult.end - stringResult.start);
}
@@ -612,7 +635,6 @@ public class IcebergArrowColumnVector extends ColumnVector {
}
}
-
private class DateAccessor extends ArrowVectorAccessor {
private final DateDayVector vector;
@@ -688,11 +710,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
/**
* Any call to "get" method will throw UnsupportedOperationException.
- *
- * Access struct values in a ArrowColumnVector doesn't use this vector. Instead, it uses
- * getStruct() method defined in the parent class. Any call to "get" method in this class is a
- * bug in the code.
- *
+ * <p>
+ * Access struct values in a ArrowColumnVector doesn't use this vector. Instead, it uses getStruct() method defined in
+ * the parent class. Any call to "get" method in this class is a bug in the code.
*/
private class StructAccessor extends ArrowVectorAccessor {
@@ -704,7 +724,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
private class DictionaryDecimalBinaryAccessor extends ArrowVectorAccessor {
private final IntVector vector;
- public DictionaryDecimalBinaryAccessor(IntVector vector, int precision, int scale) {
+ DictionaryDecimalBinaryAccessor(IntVector vector, int precision, int scale) {
super(vector);
this.vector = vector;
}
@@ -713,7 +733,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
//TODO: samarth refer to decodeDictionaryIds in VectorizedColumnReader
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
- if (isNullAt(rowId)) return null;
+ if (isNullAt(rowId)) {
+ return null;
+ }
Binary value = dictionary.decodeToBinary(vector.get(rowId));
BigInteger unscaledValue = new BigInteger(value.getBytesUnsafe());
return Decimal.apply(unscaledValue.longValue(), precision, scale);
@@ -723,7 +745,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
private class DictionaryDecimalLongAccessor extends ArrowVectorAccessor {
private final IntVector vector;
- public DictionaryDecimalLongAccessor(IntVector vector, int precision, int scale) {
+ DictionaryDecimalLongAccessor(IntVector vector, int precision, int scale) {
super(vector);
this.vector = vector;
}
@@ -731,7 +753,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
//TODO: samarth not sure this is efficient or correct
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
- if (isNullAt(rowId)) return null;
+ if (isNullAt(rowId)) {
+ return null;
+ }
long unscaledValue = dictionary.decodeToLong(vector.get(rowId));
return Decimal.apply(unscaledValue, precision, scale);
}
@@ -740,7 +764,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
private class DictionaryDecimalIntAccessor extends ArrowVectorAccessor {
private final IntVector vector;
- public DictionaryDecimalIntAccessor(IntVector vector, int precision, int scale) {
+ DictionaryDecimalIntAccessor(IntVector vector, int precision, int scale) {
super(vector);
this.vector = vector;
}
@@ -748,7 +772,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
//TODO: samarth not sure this is efficient or correct
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
- if (isNullAt(rowId)) return null;
+ if (isNullAt(rowId)) {
+ return null;
+ }
int unscaledValue = dictionary.decodeToInt(vector.get(rowId));
return Decimal.apply(unscaledValue, precision, scale);
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
similarity index 88%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
index 9a833b0..932f30a 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergDecimalArrowVector.java
@@ -17,17 +17,15 @@
* under the License.
*/
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.DecimalVector;
import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
/**
- *
- * Extension of Arrow's @{@link DecimalVector}. The whole reason of having
- * this implementation is to override the expensive {@link DecimalVector#isSet(int)} method
- * used by {@link DecimalVector#getObject(int)}.
+ * Extension of Arrow's @{@link DecimalVector}. The whole reason of having this implementation is to override the
+ * expensive {@link DecimalVector#isSet(int)} method used by {@link DecimalVector#getObject(int)}.
*/
public class IcebergDecimalArrowVector extends DecimalVector {
private NullabilityHolder nullabilityHolder;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
similarity index 90%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
index 6d8922b..2dc5fb1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarBinaryArrowVector.java
@@ -17,15 +17,15 @@
* under the License.
*/
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
/**
- * Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having
- * this implementation is to override the expensive {@link VarBinaryVector#isSet(int)} method.
+ * Extension of Arrow's @{@link VarBinaryVector}. The whole reason of having this implementation is to override the
+ * expensive {@link VarBinaryVector#isSet(int)} method.
*/
public class IcebergVarBinaryArrowVector extends VarBinaryVector {
private NullabilityHolder nullabilityHolder;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
similarity index 88%
rename from parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
index 646ddd1..0802f6c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergVarcharArrowVector.java
@@ -17,15 +17,15 @@
* under the License.
*/
-package org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow;
+package org.apache.iceberg.parquet.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
/**
- * Extension of Arrow's @{@link VarCharVector}. The reason of having
- * this implementation is to override the expensive {@link VarCharVector#isSet(int)} method.
+ * Extension of Arrow's @{@link VarCharVector}. The reason of having this implementation is to override the expensive
+ * {@link VarCharVector#isSet(int)} method.
*/
public class IcebergVarcharArrowVector extends VarCharVector {
@@ -40,7 +40,7 @@ public class IcebergVarcharArrowVector extends VarCharVector {
/**
* Same as {@link #isNull(int)}.
*
- * @param index position of element
+ * @param index position of element
* @return 1 if element at given index is not null, 0 otherwise
*/
public int isSet(int index) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
index 64c78af..5ac391b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/ColumnarBatchReaders.java
@@ -1,7 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iceberg.parquet.vectorized;
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Map;
import org.apache.arrow.vector.FieldVector;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
+import org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
@@ -9,60 +31,56 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.vectorized.ColumnarBatch;
-import java.lang.reflect.Array;
-import java.util.List;
-import java.util.Map;
-
/**
- * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's
- * vectorized read path. The {@link ColumnarBatch} returned is created by passing in the
- * Arrow vectors populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}.
+ * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized read path. The
+ * {@link ColumnarBatch} returned is created by passing in the Arrow vectors populated via delegated read calls to
+ * {@linkplain VectorizedArrowReader VectorReader(s)}.
*/
public class ColumnarBatchReaders implements VectorizedReader {
- private final VectorizedArrowReader[] readers;
-
- public ColumnarBatchReaders(List<Type> types,
- Types.StructType icebergExpectedFields,
- List<VectorizedReader> readers) {
- this.readers = (VectorizedArrowReader[]) Array.newInstance(
- VectorizedArrowReader.class, readers.size());
- int i = 0;
- for (VectorizedReader reader : readers) {
- this.readers[i] = (VectorizedArrowReader) reader;
- i++;
- }
+ private final VectorizedArrowReader[] readers;
+ public ColumnarBatchReaders(
+ List<Type> types,
+ Types.StructType icebergExpectedFields,
+ List<VectorizedReader> readers) {
+ this.readers = (VectorizedArrowReader[]) Array.newInstance(
+ VectorizedArrowReader.class, readers.size());
+ int idx = 0;
+ for (VectorizedReader reader : readers) {
+ this.readers[idx] = (VectorizedArrowReader) reader;
+ idx++;
}
+ }
- public final void setRowGroupInfo(PageReadStore pageStore,
- DictionaryPageReadStore dictionaryPageReadStore,
- Map<ColumnPath, Boolean> columnDictEncoded) {
- for (int i = 0; i < readers.length; i += 1) {
- readers[i].setRowGroupInfo(pageStore, dictionaryPageReadStore, columnDictEncoded);
- }
+ public final void setRowGroupInfo(
+ PageReadStore pageStore,
+ DictionaryPageReadStore dictionaryPageReadStore,
+ Map<ColumnPath, Boolean> columnDictEncoded) {
+ for (int i = 0; i < readers.length; i += 1) {
+ readers[i].setRowGroupInfo(pageStore, dictionaryPageReadStore, columnDictEncoded);
}
+ }
- public final ColumnarBatch read(ColumnarBatch ignore) {
- IcebergArrowColumnVector[] icebergArrowColumnVectors = (IcebergArrowColumnVector[]) Array.newInstance(IcebergArrowColumnVector.class,
- readers.length);
- int numRows = 0;
- for (int i = 0; i < readers.length; i += 1) {
- NullabilityHolder nullabilityHolder = new NullabilityHolder(readers[i].batchSize());
- VectorHolder holder = readers[i].read(nullabilityHolder);
- FieldVector vector = holder.getVector();
- icebergArrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
- if (i > 0 && numRows != vector.getValueCount()) {
- throw new IllegalStateException("Different number of values returned by readers" +
- "for columns " + readers[i - 1] + " and " + readers[i]);
- }
- numRows = vector.getValueCount();
- }
-
- ColumnarBatch batch = new ColumnarBatch(icebergArrowColumnVectors);
- batch.setNumRows(numRows);
-
- return batch;
+ public final ColumnarBatch read(ColumnarBatch ignore) {
+ IcebergArrowColumnVector[] icebergArrowColumnVectors = (IcebergArrowColumnVector[]) Array.newInstance(
+ IcebergArrowColumnVector.class,
+ readers.length);
+ int numRows = 0;
+ for (int i = 0; i < readers.length; i += 1) {
+ NullabilityHolder nullabilityHolder = new NullabilityHolder(readers[i].batchSize());
+ VectorHolder holder = readers[i].read(nullabilityHolder);
+ FieldVector vector = holder.getVector();
+ icebergArrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
+ if (i > 0 && numRows != vector.getValueCount()) {
+ throw new IllegalStateException("Different number of values returned by readers" +
+ "for columns " + readers[i - 1] + " and " + readers[i]);
+ }
+ numRows = vector.getValueCount();
}
-}
+ ColumnarBatch batch = new ColumnarBatch(icebergArrowColumnVectors);
+ batch.setNumRows(numRows);
+ return batch;
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
index 70b84d9..db625ea 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/NullabilityHolder.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iceberg.parquet.vectorized;
public class NullabilityHolder {
@@ -26,22 +27,11 @@ public class NullabilityHolder {
this.isNull = new boolean[batchSize];
}
-
public void setNull(int idx) {
- isNull[idx] = true;
+ isNull[idx] = true;
numNulls++;
}
- public void setNulls(int idx, int num) {
- int i = 0;
- while (i < num) {
- isNull[idx] = true;
- numNulls++;
- idx++;
- i++;
- }
- }
-
public boolean isNullAt(int idx) {
return isNull[idx];
}
@@ -53,4 +43,4 @@ public class NullabilityHolder {
public int numNulls() {
return numNulls;
}
-}
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
index f11c968..d28a6af 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorHolder.java
@@ -19,46 +19,47 @@
package org.apache.iceberg.parquet.vectorized;
+import javax.annotation.Nullable;
import org.apache.arrow.vector.FieldVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
-import javax.annotation.Nullable;
-
/**
- * Container class for holding the Arrow vector holding a batch of values
- * along with other state needed for reading values out of it.
+ * Container class for holding the Arrow vector holding a batch of values along with other state needed for reading
+ * values out of it.
*/
public class VectorHolder {
- private final ColumnDescriptor columnDescriptor;
- private final FieldVector vector;
- private final boolean isDictionaryEncoded;
-
- @Nullable
- private final Dictionary dictionary;
-
+ private final ColumnDescriptor columnDescriptor;
+ private final FieldVector vector;
+ private final boolean isDictionaryEncoded;
- public VectorHolder(ColumnDescriptor columnDescriptor, FieldVector vector, boolean isDictionaryEncoded, Dictionary dictionary) {
- this.columnDescriptor = columnDescriptor;
- this.vector = vector;
- this.isDictionaryEncoded = isDictionaryEncoded;
- this.dictionary = dictionary;
- }
+ @Nullable
+ private final Dictionary dictionary;
- public ColumnDescriptor getDescriptor() {
- return columnDescriptor;
- }
+ public VectorHolder(
+ ColumnDescriptor columnDescriptor,
+ FieldVector vector,
+ boolean isDictionaryEncoded,
+ Dictionary dictionary) {
+ this.columnDescriptor = columnDescriptor;
+ this.vector = vector;
+ this.isDictionaryEncoded = isDictionaryEncoded;
+ this.dictionary = dictionary;
+ }
- public FieldVector getVector() {
- return vector;
- }
+ public ColumnDescriptor getDescriptor() {
+ return columnDescriptor;
+ }
- public boolean isDictionaryEncoded() {
- return isDictionaryEncoded;
- }
+ public FieldVector getVector() {
+ return vector;
+ }
- public Dictionary getDictionary() {
- return dictionary;
- }
+ public boolean isDictionaryEncoded() {
+ return isDictionaryEncoded;
+ }
-}
+ public Dictionary getDictionary() {
+ return dictionary;
+ }
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
index f44c5dc..9da909d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedArrowReader.java
@@ -19,16 +19,25 @@
package org.apache.iceberg.parquet.vectorized;
+import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
-import org.apache.iceberg.arrow.ArrowSchemaUtil;
import org.apache.iceberg.parquet.ParquetUtil;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergDecimalArrowVector;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarBinaryArrowVector;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergVarcharArrowVector;
+import org.apache.iceberg.parquet.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.arrow.IcebergDecimalArrowVector;
+import org.apache.iceberg.parquet.arrow.IcebergVarBinaryArrowVector;
+import org.apache.iceberg.parquet.arrow.IcebergVarcharArrowVector;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@@ -38,8 +47,6 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.PrimitiveType;
-import java.util.Map;
-
/***
* {@link VectorizedReader VectorReader(s)} that read in a batch of values into Arrow vectors.
* It also takes care of allocating the right kind of Arrow vectors depending on the corresponding
@@ -79,12 +86,12 @@ public class VectorizedArrowReader implements VectorizedReader {
public VectorizedArrowReader(
ColumnDescriptor desc,
Types.NestedField icebergField,
- BufferAllocator rootAlloc,
+ BufferAllocator ra,
int batchSize) {
this.icebergField = icebergField;
this.batchSize = (batchSize == 0) ? DEFAULT_BATCH_SIZE : batchSize;
this.columnDescriptor = desc;
- this.rootAlloc = rootAlloc;
+ this.rootAlloc = ra;
this.isFixedLengthDecimal = ParquetUtil.isFixedLengthDecimal(desc);
this.isVarWidthType = ParquetUtil.isVarWidthType(desc);
this.isFixedWidthBinary = ParquetUtil.isFixedWidthBinary(desc);
@@ -97,9 +104,10 @@ public class VectorizedArrowReader implements VectorizedReader {
this.vectorizedColumnIterator = new VectorizedColumnIterator(desc, "", batchSize);
}
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public VectorHolder read(NullabilityHolder nullabilityHolder) {
if (vec == null) {
- typeWidth = allocateFieldVector(rootAlloc, icebergField, columnDescriptor);
+ typeWidth = allocateFieldVector();
}
vec.setValueCount(0);
if (vectorizedColumnIterator.hasNext()) {
@@ -141,16 +149,19 @@ public class VectorizedArrowReader implements VectorizedReader {
return new VectorHolder(columnDescriptor, vec, allPagesDictEncoded, dictionary);
}
- private int allocateFieldVector(BufferAllocator rootAlloc, Types.NestedField icebergField, ColumnDescriptor desc) {
+ private int allocateFieldVector() {
if (allPagesDictEncoded) {
- Field field = new Field(icebergField.name(), new FieldType(icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null), null);
+ Field field = new Field(
+ icebergField.name(),
+ new FieldType(icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null),
+ null);
this.vec = field.createVector(rootAlloc);
((IntVector) vec).allocateNew(batchSize);
return IntVector.TYPE_WIDTH;
} else {
- PrimitiveType primitive = desc.getPrimitiveType();
+ PrimitiveType primitive = columnDescriptor.getPrimitiveType();
if (primitive.getOriginalType() != null) {
- switch (desc.getPrimitiveType().getOriginalType()) {
+ switch (columnDescriptor.getPrimitiveType().getOriginalType()) {
case ENUM:
case JSON:
case UTF8:
@@ -182,7 +193,7 @@ public class VectorizedArrowReader implements VectorizedReader {
case DECIMAL:
DecimalMetadata decimal = primitive.getDecimalMetadata();
this.vec = new IcebergDecimalArrowVector(icebergField.name(), rootAlloc, decimal.getPrecision(),
- decimal.getScale());
+ decimal.getScale());
((DecimalVector) vec).allocateNew(batchSize);
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
@@ -194,18 +205,18 @@ public class VectorizedArrowReader implements VectorizedReader {
return IntVector.TYPE_WIDTH;
default:
throw new UnsupportedOperationException(
- "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+ "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
default:
throw new UnsupportedOperationException(
- "Unsupported logical type: " + primitive.getOriginalType());
+ "Unsupported logical type: " + primitive.getOriginalType());
}
} else {
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
int len = ((Types.FixedType) icebergField.type()).length();
this.vec = new IcebergVarBinaryArrowVector(icebergField.name(), rootAlloc);
- int factor = (len + DEFAULT_RECORD_BYTE_COUNT - 1) / (DEFAULT_RECORD_BYTE_COUNT);
+ int factor = (len + DEFAULT_RECORD_BYTE_COUNT - 1) / DEFAULT_RECORD_BYTE_COUNT;
vec.setInitialCapacity(batchSize * factor);
vec.allocateNew();
return len;
@@ -242,9 +253,10 @@ public class VectorizedArrowReader implements VectorizedReader {
}
}
- public void setRowGroupInfo(PageReadStore source,
- DictionaryPageReadStore dictionaryPageReadStore,
- Map<ColumnPath, Boolean> columnDictEncoded) {
+ public void setRowGroupInfo(
+ PageReadStore source,
+ DictionaryPageReadStore dictionaryPageReadStore,
+ Map<ColumnPath, Boolean> columnDictEncoded) {
allPagesDictEncoded = columnDictEncoded.get(ColumnPath.get(columnDescriptor.getPath()));
dictionary = vectorizedColumnIterator.setRowGroupInfo(source, dictionaryPageReadStore, allPagesDictEncoded);
}
@@ -257,6 +269,4 @@ public class VectorizedArrowReader implements VectorizedReader {
public int batchSize() {
return batchSize;
}
-
}
-
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
index fafaec6..703e68e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedColumnIterator.java
@@ -20,17 +20,20 @@
package org.apache.iceberg.parquet.vectorized;
import java.io.IOException;
-
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.io.ParquetDecodingException;
/**
- * Vectorized version of the ColumnIterator that reads column values in data pages of a column
- * in a row group in a batched fashion.
+ * Vectorized version of the ColumnIterator that reads column values in data pages of a column in a row group in a
+ * batched fashion.
*/
public class VectorizedColumnIterator {
@@ -50,15 +53,16 @@ public class VectorizedColumnIterator {
this.vectorizedPageIterator = new VectorizedPageIterator(desc, writerVersion, batchSize);
}
- public Dictionary setRowGroupInfo(PageReadStore store,
- DictionaryPageReadStore dictionaryPageReadStore,
- boolean allPagesDictEncoded) {
+ public Dictionary setRowGroupInfo(
+ PageReadStore store,
+ DictionaryPageReadStore dictionaryPageReadStore,
+ boolean allPagesDictEncoded) {
this.columnPageReader = store.getPageReader(desc);
this.totalValuesCount = columnPageReader.getTotalValueCount();
this.valuesRead = 0L;
this.advanceNextPageCount = 0L;
this.vectorizedPageIterator.reset();
- Dictionary dict = readDictionaryForColumn(desc, dictionaryPageReadStore);
+ Dictionary dict = readDictionaryForColumn(dictionaryPageReadStore);
this.vectorizedPageIterator.setDictionaryForColumn(dict, allPagesDictEncoded);
advance();
return dict;
@@ -91,7 +95,7 @@ public class VectorizedColumnIterator {
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
- rowsReadSoFar, typeWidth, holder);
+ rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
@@ -106,7 +110,7 @@ public class VectorizedColumnIterator {
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchDictionaryIds(vector, batchSize - rowsReadSoFar,
- rowsReadSoFar, holder);
+ rowsReadSoFar, holder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
vector.setValueCount(rowsReadSoFar);
@@ -161,11 +165,15 @@ public class VectorizedColumnIterator {
/**
* Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
*/
- public void nextBatchIntLongBackedDecimal(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
+ public void nextBatchIntLongBackedDecimal(
+ FieldVector fieldVector,
+ int typeWidth,
+ NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch =
+ vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -176,11 +184,15 @@ public class VectorizedColumnIterator {
/**
* Method for reading a batch of decimals backed by fixed length byte array parquet data type.
*/
- public void nextBatchFixedLengthDecimal(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
+ public void nextBatchFixedLengthDecimal(
+ FieldVector fieldVector,
+ int typeWidth,
+ NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch =
+ vectorizedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -196,7 +208,7 @@ public class VectorizedColumnIterator {
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
- rowsReadSoFar, nullabilityHolder);
+ rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
@@ -210,7 +222,8 @@ public class VectorizedColumnIterator {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
- int rowsInThisBatch = vectorizedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
+ int rowsInThisBatch =
+ vectorizedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
@@ -226,15 +239,15 @@ public class VectorizedColumnIterator {
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
- rowsReadSoFar, nullabilityHolder);
+ rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.valuesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
- private Dictionary readDictionaryForColumn(ColumnDescriptor desc,
- DictionaryPageReadStore dictionaryPageReadStore) {
+ private Dictionary readDictionaryForColumn(
+ DictionaryPageReadStore dictionaryPageReadStore) {
if (dictionaryPageReadStore == null) {
return null;
}
@@ -248,5 +261,4 @@ public class VectorizedColumnIterator {
}
return null;
}
-
-}
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
index 618be17..c62baf3 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedPageIterator.java
@@ -19,12 +19,15 @@
package org.apache.iceberg.parquet.vectorized;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+
import com.google.common.base.Preconditions;
+import java.io.IOException;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.iceberg.parquet.BytesReader;
+import org.apache.iceberg.parquet.ValuesAsBytesReader;
import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
@@ -42,404 +45,519 @@ import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-
public class VectorizedPageIterator {
- private static final Logger LOG = LoggerFactory.getLogger(VectorizedPageIterator.class);
- public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
- this.desc = desc;
- this.writerVersion = writerVersion;
+ private static final Logger LOG = LoggerFactory.getLogger(VectorizedPageIterator.class);
+
+ public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, int batchSize) {
+ this.desc = desc;
+ this.writerVersion = writerVersion;
+ }
+
+ private final ColumnDescriptor desc;
+ private final String writerVersion;
+
+ // iterator state
+ private boolean hasNext = false;
+ private int triplesRead = 0;
+
+ // page bookkeeping
+ private Dictionary dict = null;
+ private DataPage page = null;
+ private int triplesCount = 0;
+
+ // Needed once we add support for complex types. Unused for now.
+ private IntIterator repetitionLevels = null;
+ private int currentRL = 0;
+
+ private VectorizedParquetValuesReader definitionLevelReader;
+ private boolean eagerDecodeDictionary;
+ private ValuesAsBytesReader plainValuesReader = null;
+ private VectorizedParquetValuesReader dictionaryEncodedValuesReader = null;
+ private boolean allPagesDictEncoded;
+
+ public void setPage(DataPage page) {
+ this.page = Preconditions.checkNotNull(page, "Cannot read from null page");
+ this.page.accept(new DataPage.Visitor<ValuesReader>() {
+ @Override
+ public ValuesReader visit(DataPageV1 dataPageV1) {
+ initFromPage(dataPageV1);
+ return null;
+ }
+
+ @Override
+ public ValuesReader visit(DataPageV2 dataPageV2) {
+ initFromPage(dataPageV2);
+ return null;
+ }
+ });
+ this.triplesRead = 0;
+ advance();
+ }
+
+ // Dictionary is set per row group
+ public void setDictionaryForColumn(Dictionary dict, boolean allPagesDictEncoded) {
+ this.dict = dict;
+ this.allPagesDictEncoded = allPagesDictEncoded;
+ }
+
+ public void reset() {
+ this.page = null;
+ this.triplesCount = 0;
+ this.triplesRead = 0;
+ this.repetitionLevels = null;
+ this.plainValuesReader = null;
+ this.definitionLevelReader = null;
+ this.hasNext = false;
+ }
+
+ public int currentPageCount() {
+ return triplesCount;
+ }
+
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ /**
+ * Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels,
+ * dictionary ids in Parquet are RLE encoded as well.
+ */
+ public int nextBatchDictionaryIds(
+ final IntVector vector, final int expectedBatchSize,
+ final int numValsInVector,
+ NullabilityHolder holder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- private final ColumnDescriptor desc;
- private final String writerVersion;
-
- // iterator state
- private boolean hasNext = false;
- private int triplesRead = 0;
-
- // page bookkeeping
- private Dictionary dict = null;
- private DataPage page = null;
- private int triplesCount = 0;
-
- // Needed once we add support for complex types. Unused for now.
- private IntIterator repetitionLevels = null;
- private int currentRL = 0;
-
- private VectorizedParquetValuesReader definitionLevelReader;
- private boolean eagerDecodeDictionary;
- private BytesReader plainValuesReader = null;
- private VectorizedParquetValuesReader dictionaryEncodedValuesReader = null;
- private boolean allPagesDictEncoded;
-
- public void setPage(DataPage page) {
- this.page = Preconditions.checkNotNull(page, "Cannot read from null page");
- this.page.accept(new DataPage.Visitor<ValuesReader>() {
- @Override
- public ValuesReader visit(DataPageV1 dataPageV1) {
- initFromPage(dataPageV1);
- return null;
- }
-
- @Override
- public ValuesReader visit(DataPageV2 dataPageV2) {
- initFromPage(dataPageV2);
- return null;
- }
- });
- this.triplesRead = 0;
- advance();
+ definitionLevelReader.readBatchOfDictionaryIds(
+ vector,
+ numValsInVector,
+ actualBatchSize,
+ holder,
+ dictionaryEncodedValuesReader);
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading a batch of values of INT32 data type
+ */
+ public int nextBatchIntegers(
+ final FieldVector vector, final int expectedBatchSize,
+ final int numValsInVector,
+ final int typeWidth, NullabilityHolder holder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- // Dictionary is set per row group
- public void setDictionaryForColumn(Dictionary dict, boolean allPagesDictEncoded) {
- this.dict = dict;
- this.allPagesDictEncoded = allPagesDictEncoded;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedIntegers(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchOfIntegers(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ plainValuesReader);
}
-
- public void reset() {
- this.page = null;
- this.triplesCount = 0;
- this.triplesRead = 0;
- this.repetitionLevels = null;
- this.plainValuesReader = null;
- this.definitionLevelReader = null;
- this.hasNext = false;
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading a batch of values of INT64 data type
+ */
+ public int nextBatchLongs(
+ final FieldVector vector, final int expectedBatchSize,
+ final int numValsInVector,
+ final int typeWidth, NullabilityHolder holder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- public int currentPageCount() {
- return triplesCount;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedLongs(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchOfLongs(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ plainValuesReader);
}
-
- public boolean hasNext() {
- return hasNext;
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading a batch of values of FLOAT data type.
+ */
+ public int nextBatchFloats(
+ final FieldVector vector, final int expectedBatchSize,
+ final int numValsInVector,
+ final int typeWidth, NullabilityHolder holder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- /**
- * Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels, dictionary ids in Parquet are RLE
- * encoded as well.
- */
- public int nextBatchDictionaryIds(final IntVector vector, final int expectedBatchSize,
- final int numValsInVector,
- NullabilityHolder holder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- definitionLevelReader.readBatchOfDictionaryIds(vector, numValsInVector, actualBatchSize, holder, dictionaryEncodedValuesReader);
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedFloats(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchOfFloats(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ plainValuesReader);
}
-
- /**
- * Method for reading a batch of values of INT32 data type
- */
- public int nextBatchIntegers(final FieldVector vector, final int expectedBatchSize,
- final int numValsInVector,
- final int typeWidth, NullabilityHolder holder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedIntegers(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchOfIntegers(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading a batch of values of DOUBLE data type
+ */
+ public int nextBatchDoubles(
+ final FieldVector vector, final int expectedBatchSize,
+ final int numValsInVector,
+ final int typeWidth, NullabilityHolder holder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- /**
- * Method for reading a batch of values of INT64 data type
- */
- public int nextBatchLongs(final FieldVector vector, final int expectedBatchSize,
- final int numValsInVector,
- final int typeWidth, NullabilityHolder holder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedLongs(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchOfLongs(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedDoubles(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchOfDoubles(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ holder,
+ plainValuesReader);
}
-
- /**
- * Method for reading a batch of values of FLOAT data type.
- */
- public int nextBatchFloats(final FieldVector vector, final int expectedBatchSize,
- final int numValsInVector,
- final int typeWidth, NullabilityHolder holder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedFloats(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchOfFloats(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ private int getActualBatchSize(int expectedBatchSize) {
+ return Math.min(expectedBatchSize, triplesCount - triplesRead);
+ }
+
+ /**
+ * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since Arrow stores all
+ * decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
+ */
+ public int nextBatchIntLongBackedDecimal(
+ final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+ final int typeWidth, NullabilityHolder nullabilityHolder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- /**
- * Method for reading a batch of values of DOUBLE data type
- */
- public int nextBatchDoubles(final FieldVector vector, final int expectedBatchSize,
- final int numValsInVector,
- final int typeWidth, NullabilityHolder holder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedDoubles(vector, numValsInVector, typeWidth, actualBatchSize, holder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchOfDoubles(vector, numValsInVector, typeWidth, actualBatchSize, holder, plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedIntLongBackedDecimals(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchOfIntLongBackedDecimals(vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ plainValuesReader);
}
-
- private int getActualBatchSize(int expectedBatchSize) {
- return Math.min(expectedBatchSize, triplesCount - triplesRead);
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading a batch of decimals backed by fixed length byte array parquet data type. Arrow stores all
+ * decimals in 16 bytes. This method provides the necessary padding to the decimals read. Moreover, Arrow interprets
+ * the decimals in Arrow buffer as little endian. Parquet stores fixed length decimals as big endian. So, this method
+ * uses {@link DecimalVector#setBigEndian(int, byte[])} method so that the data in Arrow vector is indeed little
+ * endian.
+ */
+ public int nextBatchFixedLengthDecimal(
+ final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+ final int typeWidth, NullabilityHolder nullabilityHolder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- /**
- * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types.
- * Since Arrow stores all decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
- */
- public int nextBatchIntLongBackedDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
- final int typeWidth, NullabilityHolder nullabilityHolder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedIntLongBackedDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchOfIntLongBackedDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder,
- plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchOfFixedLengthDecimals(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ plainValuesReader);
}
-
- /**
- * Method for reading a batch of decimals backed by fixed length byte array parquet data type.
- * Arrow stores all decimals in 16 bytes. This method provides the necessary padding to the decimals read.
- * Moreover, Arrow interprets the decimals in Arrow buffer as little endian. Parquet stores fixed length decimals
- * as big endian. So, this method uses {@link DecimalVector#setBigEndian(int, byte[])} method so that the data in
- * Arrow vector is indeed little endian.
- */
- public int nextBatchFixedLengthDecimal(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
- final int typeWidth, NullabilityHolder nullabilityHolder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchOfFixedLengthDecimals(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
+ */
+ public int nextBatchVarWidthType(
+ final FieldVector vector, final int expectedBatchSize, final int numValsInVector
+ , NullabilityHolder nullabilityHolder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- /**
- * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
- */
- public int nextBatchVarWidthType(final FieldVector vector, final int expectedBatchSize, final int numValsInVector
- , NullabilityHolder nullabilityHolder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedVarWidth(vector, numValsInVector, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchVarWidth(vector, numValsInVector, actualBatchSize, nullabilityHolder, plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedVarWidth(
+ vector,
+ numValsInVector,
+ actualBatchSize,
+ nullabilityHolder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchVarWidth(
+ vector,
+ numValsInVector,
+ actualBatchSize,
+ nullabilityHolder,
+ plainValuesReader);
}
-
- /**
- * Method for reading batches of fixed width binary type (e.g. BYTE[7]).
- * Spark does not support fixed width binary data type. To work around this limitation, the data is read as
- * fixed width binary from parquet and stored in a {@link VarBinaryVector} in Arrow.
- */
- public int nextBatchFixedWidthBinary(final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
- final int typeWidth, NullabilityHolder nullabilityHolder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- if (eagerDecodeDictionary) {
- definitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, dictionaryEncodedValuesReader, dict);
- } else {
- definitionLevelReader.readBatchOfFixedWidthBinary(vector, numValsInVector, typeWidth, actualBatchSize, nullabilityHolder, plainValuesReader);
- }
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading batches of fixed width binary type (e.g. BYTE[7]). Spark does not support fixed width binary
+ * data type. To work around this limitation, the data is read as fixed width binary from parquet and stored in a
+ * {@link VarBinaryVector} in Arrow.
+ */
+ public int nextBatchFixedWidthBinary(
+ final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+ final int typeWidth, NullabilityHolder nullabilityHolder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- /**
- * Method for reading batches of booleans.
- */
- public int nextBatchBoolean(final FieldVector vector, final int expectedBatchSize, final int numValsInVector, NullabilityHolder nullabilityHolder) {
- final int actualBatchSize = getActualBatchSize(expectedBatchSize);
- if (actualBatchSize <= 0) {
- return 0;
- }
- definitionLevelReader.readBatchOfBooleans(vector, numValsInVector, actualBatchSize,
- nullabilityHolder, plainValuesReader);
- triplesRead += actualBatchSize;
- this.hasNext = triplesRead < triplesCount;
- return actualBatchSize;
+ if (eagerDecodeDictionary) {
+ definitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ dictionaryEncodedValuesReader,
+ dict);
+ } else {
+ definitionLevelReader.readBatchOfFixedWidthBinary(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ plainValuesReader);
}
-
- private void advance() {
- if (triplesRead < triplesCount) {
- this.hasNext = true;
- } else {
- this.hasNext = false;
- }
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ /**
+ * Method for reading batches of booleans.
+ */
+ public int nextBatchBoolean(
+ final FieldVector vector,
+ final int expectedBatchSize,
+ final int numValsInVector,
+ NullabilityHolder nullabilityHolder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
}
-
- private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
- ValuesReader previousReader = plainValuesReader;
- this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dict != null && !allPagesDictEncoded;
- if (dataEncoding.usesDictionary()) {
- if (dict == null) {
- throw new ParquetDecodingException(
- "could not read page in col " + desc + " as the dictionary was missing for encoding " + dataEncoding);
- }
- try {
- dictionaryEncodedValuesReader = new VectorizedParquetValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
- dictionaryEncodedValuesReader.initFromPage(valueCount, in);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page in col " + desc, e);
- }
- } else {
- plainValuesReader = new BytesReader();
- plainValuesReader.initFromPage(valueCount, in);
- }
- if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
- previousReader != null && previousReader instanceof RequiresPreviousReader) {
- // previous reader can only be set if reading sequentially
- ((RequiresPreviousReader) plainValuesReader).setPreviousReader(previousReader);
- }
+ definitionLevelReader.readBatchOfBooleans(vector, numValsInVector, actualBatchSize,
+ nullabilityHolder, plainValuesReader);
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
+ private void advance() {
+ if (triplesRead < triplesCount) {
+ this.hasNext = true;
+ } else {
+ this.hasNext = false;
}
-
-
- private void initFromPage(DataPageV1 page) {
- this.triplesCount = page.getValueCount();
- ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
- ValuesReader dlReader;
- int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
- this.definitionLevelReader = new VectorizedParquetValuesReader(
- bitWidth, desc.getMaxDefinitionLevel());
- dlReader = this.definitionLevelReader;
- this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
- try {
- BytesInput bytes = page.getBytes();
- ByteBufferInputStream in = bytes.toInputStream();
- rlReader.initFromPage(triplesCount, in);
- dlReader.initFromPage(triplesCount, in);
- initDataReader(page.getValueEncoding(), in, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
- }
+ }
+
+ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
+ ValuesReader previousReader = plainValuesReader;
+ this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dict != null && !allPagesDictEncoded;
+ if (dataEncoding.usesDictionary()) {
+ if (dict == null) {
+ throw new ParquetDecodingException(
+ "could not read page in col " + desc + " as the dictionary was missing for encoding " + dataEncoding);
+ }
+ try {
+ dictionaryEncodedValuesReader =
+ new VectorizedParquetValuesReader(desc.getMaxDefinitionLevel());//(DictionaryValuesReader) dataEncoding.getDictionaryBasedValuesReader(desc, ValuesType.VALUES, dict);
+ dictionaryEncodedValuesReader.initFromPage(valueCount, in);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page in col " + desc, e);
+ }
+ } else {
+ plainValuesReader = new ValuesAsBytesReader();
+ plainValuesReader.initFromPage(valueCount, in);
}
-
- private void initFromPage(DataPageV2 page) {
- this.triplesCount = page.getValueCount();
- this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), page.getRepetitionLevels());
- LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
- try {
- int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
- initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
- this.definitionLevelReader = new VectorizedParquetValuesReader(bitWidth, false,
- desc.getMaxDefinitionLevel());
- definitionLevelReader.initFromPage(triplesCount, page.getDefinitionLevels().toInputStream());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
- }
+ if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
+ previousReader != null && previousReader instanceof RequiresPreviousReader) {
+ // previous reader can only be set if reading sequentially
+ ((RequiresPreviousReader) plainValuesReader).setPreviousReader(previousReader);
}
-
- private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
- try {
- if (maxLevel == 0) {
- return new NullIntIterator();
- }
- return new RLEIntIterator(
- new RunLengthBitPackingHybridDecoder(
- BytesUtils.getWidthFromMaxInt(maxLevel),
- bytes.toInputStream()));
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read levels in page for col " + desc, e);
- }
+ }
+
+ private void initFromPage(DataPageV1 page) {
+ this.triplesCount = page.getValueCount();
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
+ ValuesReader dlReader;
+ int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+ this.definitionLevelReader = new VectorizedParquetValuesReader(
+ bitWidth, desc.getMaxDefinitionLevel());
+ dlReader = this.definitionLevelReader;
+ this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
+ try {
+ BytesInput bytes = page.getBytes();
+ ByteBufferInputStream in = bytes.toInputStream();
+ rlReader.initFromPage(triplesCount, in);
+ dlReader.initFromPage(triplesCount, in);
+ initDataReader(page.getValueEncoding(), in, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
+ }
+ }
+
+ private void initFromPage(DataPageV2 page) {
+ this.triplesCount = page.getValueCount();
+ this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), page.getRepetitionLevels());
+ LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
+ try {
+ int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+ initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
+ this.definitionLevelReader = new VectorizedParquetValuesReader(bitWidth, false,
+ desc.getMaxDefinitionLevel());
+ definitionLevelReader.initFromPage(triplesCount, page.getDefinitionLevels().toInputStream());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
}
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ bytes.toInputStream()));
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read levels in page for col " + desc, e);
+ }
+ }
+
+ static abstract class IntIterator {
+ abstract int nextInt();
+ }
- static abstract class IntIterator {
- abstract int nextInt();
+ static class ValuesReaderIntIterator extends IntIterator {
+ ValuesReader delegate;
+
+ ValuesReaderIntIterator(ValuesReader delegate) {
+ super();
+ this.delegate = delegate;
}
- static class ValuesReaderIntIterator extends IntIterator {
- ValuesReader delegate;
+ @Override
+ int nextInt() {
+ return delegate.readInteger();
+ }
+ }
- ValuesReaderIntIterator(ValuesReader delegate) {
- super();
- this.delegate = delegate;
- }
+ static class RLEIntIterator extends IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
- @Override
- int nextInt() {
- return delegate.readInteger();
- }
+ RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
}
- static class RLEIntIterator extends IntIterator {
- RunLengthBitPackingHybridDecoder delegate;
-
- RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- try {
- return delegate.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
- }
+ @Override
+ int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
}
+ }
- private static final class NullIntIterator extends IntIterator {
- @Override
- int nextInt() {
- return 0;
- }
+ private static final class NullIntIterator extends IntIterator {
+ @Override
+ int nextInt() {
+ return 0;
}
-}
+ }
+}
\ No newline at end of file
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
index 5500352..1745eb0 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetValuesReader.java
@@ -1,25 +1,35 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.iceberg.parquet.vectorized;
import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.*;
-import org.apache.iceberg.parquet.BytesReader;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.iceberg.parquet.ValuesAsBytesReader;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
@@ -29,1205 +39,1331 @@ import org.apache.parquet.column.values.bitpacking.BytePacker;
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetDecodingException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
/**
- * A values reader for Parquet's run-length encoded data that reads column data in batches
- * instead of one value at a time.
- * This is based off of the version in Apache Spark with these changes:
+ * A values reader for Parquet's run-length encoded data that reads column data in batches instead of one value at a
+ * time. This is based off of the version in Apache Spark with these changes:
* <p>
* <tr>Writes batches of values retrieved to Arrow vectors</tr>
* <tr>If all pages of a column within the row group are not dictionary encoded, then
- * dictionary ids are eagerly decoded into actual values before writing them
- * to the Arrow vectors</tr>
+ * dictionary ids are eagerly decoded into actual values before writing them to the Arrow vectors</tr>
* </p>
*/
public final class VectorizedParquetValuesReader extends ValuesReader {
- // Current decoding mode. The encoded data contains groups of either run length encoded data
- // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
- // the number of values in the group.
- private enum MODE {
- RLE,
- PACKED
- }
+ // Current decoding mode. The encoded data contains groups of either run length encoded data
+ // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+ // the number of values in the group.
+ private enum MODE {
+ RLE,
+ PACKED
+ }
- // Encoded data.
- private ByteBufferInputStream in;
+ // Encoded data.
+ private ByteBufferInputStream inputStream;
- // bit/byte width of decoded data and utility to batch unpack them.
- private int bitWidth;
- private int bytesWidth;
- private BytePacker packer;
+ // bit/byte width of decoded data and utility to batch unpack them.
+ private int bitWidth;
+ private int bytesWidth;
+ private BytePacker packer;
- // Current decoding mode and values
- private MODE mode;
- private int currentCount;
- private int currentValue;
+ // Current decoding mode and values
+ private MODE mode;
+ private int currentCount;
+ private int currentValue;
- // Buffer of decoded values if the values are PACKED.
- private int[] packedValuesBuffer = new int[16];
- private int packedValuesBufferIdx = 0;
+ // Buffer of decoded values if the values are PACKED.
+ private int[] packedValuesBuffer = new int[16];
+ private int packedValuesBufferIdx = 0;
- // If true, the bit width is fixed. This decoder is used in different places and this also
- // controls if we need to read the bitwidth from the beginning of the data stream.
- private final boolean fixedWidth;
- private final boolean readLength;
- private final int maxDefLevel;
+ // If true, the bit width is fixed. This decoder is used in different places and this also
+ // controls if we need to read the bitwidth from the beginning of the data stream.
+ private final boolean fixedWidth;
+ private final boolean readLength;
+ private final int maxDefLevel;
- public VectorizedParquetValuesReader(int maxDefLevel) {
- this.maxDefLevel = maxDefLevel;
- this.fixedWidth = false;
- this.readLength = false;
- }
+ public VectorizedParquetValuesReader(int maxDefLevel) {
+ this.maxDefLevel = maxDefLevel;
+ this.fixedWidth = false;
+ this.readLength = false;
+ }
- public VectorizedParquetValuesReader(
- int bitWidth,
- int maxDefLevel) {
- this.fixedWidth = true;
- this.readLength = bitWidth != 0;
- this.maxDefLevel = maxDefLevel;
- init(bitWidth);
- }
+ public VectorizedParquetValuesReader(
+ int bitWidth,
+ int maxDefLevel) {
+ this.fixedWidth = true;
+ this.readLength = bitWidth != 0;
+ this.maxDefLevel = maxDefLevel;
+ init(bitWidth);
+ }
- public VectorizedParquetValuesReader(
- int bitWidth,
- boolean readLength,
- int maxDefLevel) {
- this.fixedWidth = true;
- this.readLength = readLength;
- this.maxDefLevel = maxDefLevel;
- init(bitWidth);
- }
+ public VectorizedParquetValuesReader(
+ int bw,
+ boolean rl,
+ int mdl) {
+ this.fixedWidth = true;
+ this.readLength = rl;
+ this.maxDefLevel = mdl;
+ init(bw);
+ }
- @Override
- public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
- this.in = in;
- if (fixedWidth) {
- // initialize for repetition and definition levels
- if (readLength) {
- int length = readIntLittleEndian();
- this.in = in.sliceStream(length);
- }
- } else {
- // initialize for values
- if (in.available() > 0) {
- init(in.read());
- }
- }
- if (bitWidth == 0) {
- // 0 bit width, treat this as an RLE run of valueCount number of 0's.
- this.mode = MODE.RLE;
- this.currentCount = valueCount;
- this.currentValue = 0;
- } else {
- this.currentCount = 0;
- }
+ @Override
+ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+ this.inputStream = in;
+ if (fixedWidth) {
+ // initialize for repetition and definition levels
+ if (readLength) {
+ int length = readIntLittleEndian();
+ this.inputStream = in.sliceStream(length);
+ }
+ } else {
+ // initialize for values
+ if (in.available() > 0) {
+ init(in.read());
+ }
}
-
-
- /**
- * Initializes the internal state for decoding ints of `bitWidth`.
- */
- private void init(int bitWidth) {
- Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
- this.bitWidth = bitWidth;
- this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
- this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+ if (bitWidth == 0) {
+ // 0 bit width, treat this as an RLE run of valueCount number of 0's.
+ this.mode = MODE.RLE;
+ this.currentCount = valueCount;
+ this.currentValue = 0;
+ } else {
+ this.currentCount = 0;
}
+ }
- /**
- * Reads the next varint encoded int.
- */
- private int readUnsignedVarInt() throws IOException {
- int value = 0;
- int shift = 0;
- int b;
- do {
- b = in.read();
- value |= (b & 0x7F) << shift;
- shift += 7;
- } while ((b & 0x80) != 0);
- return value;
- }
+ /**
+ * Initializes the internal state for decoding ints of `bitWidth`.
+ */
+ private void init(int bw) {
+ Preconditions.checkArgument(bw >= 0 && bw <= 32, "bitWidth must be >= 0 and <= 32");
+ this.bitWidth = bw;
+ this.bytesWidth = BytesUtils.paddedByteCountFromBits(bw);
+ this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bw);
+ }
- /**
- * Reads the next 4 byte little endian int.
- */
- private int readIntLittleEndian() throws IOException {
- int ch4 = in.read();
- int ch3 = in.read();
- int ch2 = in.read();
- int ch1 = in.read();
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
+ /**
+ * Reads the next varint encoded int.
+ */
+ private int readUnsignedVarInt() throws IOException {
+ int value = 0;
+ int shift = 0;
+ int byteRead;
+ do {
+ byteRead = inputStream.read();
+ value |= (byteRead & 0x7F) << shift;
+ shift += 7;
+ } while ((byteRead & 0x80) != 0);
+ return value;
+ }
- /**
- * Reads the next byteWidth little endian int.
- */
- private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
- switch (bytesWidth) {
- case 0:
- return 0;
- case 1:
- return in.read();
- case 2: {
- int ch2 = in.read();
- int ch1 = in.read();
- return (ch1 << 8) + ch2;
- }
- case 3: {
- int ch3 = in.read();
- int ch2 = in.read();
- int ch1 = in.read();
- return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
- }
- case 4: {
- return readIntLittleEndian();
- }
- }
- throw new RuntimeException("Unreachable");
+ /**
+ * Reads the next 4 byte little endian int.
+ */
+ private int readIntLittleEndian() throws IOException {
+ int ch4 = inputStream.read();
+ int ch3 = inputStream.read();
+ int ch2 = inputStream.read();
+ int ch1 = inputStream.read();
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0);
+ }
+
+ /**
+ * Reads the next byteWidth little endian int.
+ */
+ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
+ switch (bytesWidth) {
+ case 0:
+ return 0;
+ case 1:
+ return inputStream.read();
+ case 2: {
+ int ch2 = inputStream.read();
+ int ch1 = inputStream.read();
+ return (ch1 << 8) + ch2;
+ }
+ case 3: {
+ int ch3 = inputStream.read();
+ int ch2 = inputStream.read();
+ int ch1 = inputStream.read();
+ return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+ }
+ case 4: {
+ return readIntLittleEndian();
+ }
}
+ throw new RuntimeException("Unreachable");
+ }
- /**
- * Reads the next group.
- */
- private void readNextGroup() {
- try {
- int header = readUnsignedVarInt();
- this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
- switch (mode) {
- case RLE:
- this.currentCount = header >>> 1;
- this.currentValue = readIntLittleEndianPaddedOnBitWidth();
- return;
- case PACKED:
- int numGroups = header >>> 1;
- this.currentCount = numGroups * 8;
+ /**
+ * Reads the next group.
+ */
+ private void readNextGroup() {
+ try {
+ int header = readUnsignedVarInt();
+ this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+ switch (mode) {
+ case RLE:
+ this.currentCount = header >>> 1;
+ this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+ return;
+ case PACKED:
+ int numGroups = header >>> 1;
+ this.currentCount = numGroups * 8;
- if (this.packedValuesBuffer.length < this.currentCount) {
- this.packedValuesBuffer = new int[this.currentCount];
- }
- packedValuesBufferIdx = 0;
- int valueIndex = 0;
- while (valueIndex < this.currentCount) {
- // values are bit packed 8 at a time, so reading bitWidth will always work
- ByteBuffer buffer = in.slice(bitWidth);
- this.packer.unpack8Values(buffer, buffer.position(), this.packedValuesBuffer, valueIndex);
- valueIndex += 8;
- }
- return;
- default:
- throw new ParquetDecodingException("not a valid mode " + this.mode);
- }
- } catch (IOException e) {
- throw new ParquetDecodingException("Failed to read from input stream", e);
- }
+ if (this.packedValuesBuffer.length < this.currentCount) {
+ this.packedValuesBuffer = new int[this.currentCount];
+ }
+ packedValuesBufferIdx = 0;
+ int valueIndex = 0;
+ while (valueIndex < this.currentCount) {
+ // values are bit packed 8 at a time, so reading bitWidth will always work
+ ByteBuffer buffer = inputStream.slice(bitWidth);
+ this.packer.unpack8Values(buffer, buffer.position(), this.packedValuesBuffer, valueIndex);
+ valueIndex += 8;
+ }
+ return;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + this.mode);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read from input stream", e);
}
+ }
- @Override
- public boolean readBoolean() {
- return this.readInteger() != 0;
- }
+ @Override
+ public boolean readBoolean() {
+ return this.readInteger() != 0;
+ }
- @Override
- public void skip() {
- this.readInteger();
- }
+ @Override
+ public void skip() {
+ this.readInteger();
+ }
- @Override
- public int readValueDictionaryId() {
- return readInteger();
- }
+ @Override
+ public int readValueDictionaryId() {
+ return readInteger();
+ }
- @Override
- public int readInteger() {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
+ @Override
+ public int readInteger() {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
- this.currentCount--;
- switch (mode) {
- case RLE:
- return this.currentValue;
- case PACKED:
- return this.packedValuesBuffer[packedValuesBufferIdx++];
- }
- throw new RuntimeException("Unreachable");
+ this.currentCount--;
+ switch (mode) {
+ case RLE:
+ return this.currentValue;
+ case PACKED:
+ return this.packedValuesBuffer[packedValuesBufferIdx++];
}
+ throw new RuntimeException("Unreachable");
+ }
- public void readBatchOfDictionaryIds(
- final IntVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
+ public void readBatchOfDictionaryIds(
+ final IntVector vector,
+ final int numValsInVector,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader dictionaryEncodedValuesReader) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int numValues = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, numValues);
+ } else {
+ setNulls(nullabilityHolder, idx, numValues, vector.getValidityBuffer());
+ }
+ idx += numValues;
+ break;
+ case PACKED:
+ for (int i = 0; i < numValues; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.set(idx, dictionaryEncodedValuesReader.readInteger());
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- dictionaryEncodedValuesReader.readDictionaryIdsInternal(vector, idx, n);
- } else {
- setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- vector.set(idx, dictionaryEncodedValuesReader.readInteger());
- } else {
- setNull(nullabilityHolder, idx, vector.getValidityBuffer());
- }
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= numValues;
+ currentCount -= numValues;
}
+ }
- // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
- // check definition level.
- private void readDictionaryIdsInternal(final IntVector c, final int numValsInVector, final int numValuesToRead) {
- int left = numValuesToRead;
- int idx = numValsInVector;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- c.set(idx, currentValue);
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- c.set(idx, packedValuesBuffer[packedValuesBufferIdx]);
- packedValuesBufferIdx++;
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ // Used for reading dictionary ids in a vectorized fashion. Unlike other methods, this doesn't
+ // check definition level.
+ private void readDictionaryIdsInternal(
+ final IntVector intVector,
+ final int numValsInVector,
+ final int numValuesToRead) {
+ int left = numValuesToRead;
+ int idx = numValsInVector;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int numValues = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < numValues; i++) {
+ intVector.set(idx, currentValue);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < numValues; i++) {
+ intVector.set(idx, packedValuesBuffer[packedValuesBufferIdx]);
+ packedValuesBufferIdx++;
+ idx++;
+ }
+ break;
+ }
+ left -= numValues;
+ currentCount -= numValues;
}
+ }
- public void readBatchOfIntegers(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- setNextNValuesInVector(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- vector,
- n);
- bufferIdx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ public void readBatchOfLongs(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int numValues = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ numValues);
+ bufferIdx += numValues;
+ break;
+ case PACKED:
+ for (int i = 0; i < numValues; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(
+ typeWidth,
+ valuesReader,
+ bufferIdx,
+ vector.getValidityBuffer(),
+ vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= numValues;
+ currentCount -= numValues;
}
+ }
- public void readBatchOfDictionaryEncodedIntegers(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
- } else {
- setNull(nullabilityHolder, idx, vector.getValidityBuffer());
- }
- idx++;
- }
- break;
+ public void readBatchOfDictionaryEncodedLongs(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int typeWidth,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader valuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int numValues = Math.min(left, this.currentCount);
+ ArrowBuf validityBuffer = vector.getValidityBuffer();
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, numValues, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, numValues, validityBuffer);
+ }
+ idx += numValues;
+ break;
+ case PACKED:
+ for (int i = 0; i < numValues; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, validityBuffer);
}
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= numValues;
+ currentCount -= numValues;
}
+ }
- private void readBatchOfDictionaryEncodedIntegersInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- ArrowBuf dataBuffer = vector.getDataBuffer();
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- dataBuffer.setInt(idx, dict.decodeToInt(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- dataBuffer.setInt(idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedLongsInternal(
+ FieldVector vector,
+ int typeWidth,
+ int index,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ int idx = index;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int numValues = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < numValues; i++) {
+ vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < numValues; i++) {
+ vector.getDataBuffer()
+ .setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= numValues;
+ currentCount -= numValues;
}
+ }
- public void readBatchOfLongs(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- setNextNValuesInVector(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- vector,
- n);
- bufferIdx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ public void readBatchOfIntegers(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ n);
+ bufferIdx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfDictionaryEncodedLongs(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedLongsInternal(vector, typeWidth, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, validityBuffer);
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setLong(idx, dict.decodeToLong(valuesReader.readInteger()));
- } else {
- setNull(nullabilityHolder, idx, validityBuffer);
- }
- idx++;
- }
- break;
+ public void readBatchOfDictionaryEncodedIntegers(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int typeWidth,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader valuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedIntegersInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setInt(idx, dict.decodeToInt(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void readBatchOfDictionaryEncodedLongsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setLong(idx, dict.decodeToLong(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setLong(idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedIntegersInternal(
+ FieldVector vector,
+ int typeWidth,
+ int idx,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ ArrowBuf dataBuffer = vector.getDataBuffer();
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ dataBuffer.setInt(idx, dict.decodeToInt(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ dataBuffer.setInt(idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfFloats(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- setNextNValuesInVector(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- vector,
- n);
- bufferIdx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ public void readBatchOfFloats(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ n);
+ bufferIdx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void setValue(int typeWidth, BytesReader valuesReader, int bufferIdx, ArrowBuf validityBuffer, ArrowBuf dataBuffer) {
- dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
- BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx);
- bufferIdx++;
- }
+ private void setValue(
+ int typeWidth,
+ ValuesAsBytesReader valuesReader,
+ int bufferIdx,
+ ArrowBuf validityBuffer,
+ ArrowBuf dataBuffer) {
+ dataBuffer.setBytes(bufferIdx * typeWidth, valuesReader.getBuffer(typeWidth));
+ BitVectorHelper.setValidityBitToOne(validityBuffer, bufferIdx);
+ bufferIdx++;
+ }
- public void readBatchOfDictionaryEncodedFloats(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, validityBuffer);
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
- } else {
- setNull(nullabilityHolder, idx, validityBuffer);
- }
- idx++;
- }
- break;
+ public void readBatchOfDictionaryEncodedFloats(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int typeWidth,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader valuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ ArrowBuf validityBuffer = vector.getValidityBuffer();
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedFloatsInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, validityBuffer);
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, validityBuffer);
}
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void readBatchOfDictionaryEncodedFloatsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedFloatsInternal(
+ FieldVector vector,
+ int typeWidth,
+ int idx,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setFloat(idx, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfDoubles(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- setNextNValuesInVector(
- typeWidth,
- maxDefLevel,
- nullabilityHolder,
- valuesReader,
- bufferIdx,
- vector,
- n);
- bufferIdx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ public void readBatchOfDoubles(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ setNextNValuesInVector(
+ typeWidth,
+ nullabilityHolder,
+ valuesReader,
+ bufferIdx,
+ vector,
+ n);
+ bufferIdx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setValue(typeWidth, valuesReader, bufferIdx, vector.getValidityBuffer(), vector.getDataBuffer());
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfDictionaryEncodedDoubles(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
- } else {
- setNull(nullabilityHolder, idx, vector.getValidityBuffer());
- }
- idx++;
- }
- break;
+ public void readBatchOfDictionaryEncodedDoubles(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int typeWidth,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader valuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedDoublesInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(valuesReader.readInteger()));
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void readBatchOfDictionaryEncodedDoublesInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedDoublesInternal(
+ FieldVector vector,
+ int typeWidth,
+ int idx,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setDouble(idx, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfFixedWidthBinary(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
+ public void readBatchOfFixedWidthBinary(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+ bufferIdx++;
}
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
- bufferIdx++;
- }
- } else {
- setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
- bufferIdx += n;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setBinaryInVector((VarBinaryVector) vector, typeWidth, valuesReader, bufferIdx);
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfDictionaryEncodedFixedWidthBinary(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
- } else {
- setNull(nullabilityHolder, idx, vector.getValidityBuffer());
- }
- idx++;
- }
- break;
+ public void readBatchOfDictionaryEncodedFixedWidthBinary(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int typeWidth,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader valuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedFixedWidthBinaryInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ vector.getDataBuffer()
+ .setBytes(idx * typeWidth, dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe());
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
- BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
- BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedFixedWidthBinaryInternal(
+ FieldVector vector,
+ int typeWidth,
+ int idx,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer().setBytes(idx * typeWidth, dict.decodeToBinary(currentValue).getBytesUnsafe());
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ vector.getDataBuffer()
+ .setBytes(
+ idx * typeWidth,
+ dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfFixedLengthDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
+ public void readBatchOfFixedLengthDecimals(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+ bufferIdx++;
}
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
- bufferIdx++;
- }
- } else {
- setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
- bufferIdx += n;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfDictionaryEncodedFixedLengthDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
- } else {
- setNull(nullabilityHolder, idx, vector.getValidityBuffer());
- }
- idx++;
- }
- break;
+ public void readBatchOfDictionaryEncodedFixedLengthDecimals(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int typeWidth,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader valuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ byte[] decimalBytes = dict.decodeToBinary(valuesReader.readInteger()).getBytesUnsafe();
+ byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+ System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(FieldVector vector, int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
- byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
- byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
- byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
- System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
- ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedFixedLengthDecimalsInternal(
+ FieldVector vector,
+ int typeWidth,
+ int idx,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
+ byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
+ byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+ System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
+ byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
+ byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
+ System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+ ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- /**
- * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP)
- * This method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow
- * vector. It appropriately sets the validity buffer in the Arrow vector.
- */
- public void readBatchVarWidth(
- final FieldVector vector,
- final int numValsInVector,
- final int batchSize,
- NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
+ /**
+ * Method for reading a batch of non-decimal numeric data types (INT32, INT64, FLOAT, DOUBLE, DATE, TIMESTAMP) This
+ * method reads batches of bytes from Parquet and writes them into the data buffer underneath the Arrow vector. It
+ * appropriately sets the validity buffer in the Arrow vector.
+ */
+ public void readBatchVarWidth(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+ bufferIdx++;
}
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
- bufferIdx++;
- }
- } else {
- setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
- bufferIdx += n;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ setVarWidthBinaryValue(vector, valuesReader, bufferIdx);
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void setVarWidthBinaryValue(FieldVector vector, BytesReader valuesReader, int bufferIdx) {
- int len = valuesReader.readInteger();
- ByteBuffer buffer = valuesReader.getBuffer(len);
- // Calling setValueLengthSafe takes care of allocating a larger buffer if
- // running out of space.
- ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
- // It is possible that the data buffer was reallocated. So it is important to
- // not cache the data buffer reference but instead use vector.getDataBuffer().
- vector.getDataBuffer().writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
- // Similarly, we need to get the latest reference to the validity buffer as well
- // since reallocation changes reference of the validity buffers as well.
- BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
- }
+ private void setVarWidthBinaryValue(FieldVector vector, ValuesAsBytesReader valuesReader, int bufferIdx) {
+ int len = valuesReader.readInteger();
+ ByteBuffer buffer = valuesReader.getBuffer(len);
+ // Calling setValueLengthSafe takes care of allocating a larger buffer if
+ // running out of space.
+ ((BaseVariableWidthVector) vector).setValueLengthSafe(bufferIdx, len);
+ // It is possible that the data buffer was reallocated. So it is important to
+ // not cache the data buffer reference but instead use vector.getDataBuffer().
+ vector.getDataBuffer().writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position());
+ // Similarly, we need to get the latest reference to the validity buffer as well
+ // since reallocation changes reference of the validity buffers as well.
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+ }
- public void readBatchOfDictionaryEncodedVarWidth(
- final FieldVector vector, final int numValsInVector,
- final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader dictionaryEncodedValuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
- }
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
- } else {
- setNull(nullabilityHolder, idx, vector.getValidityBuffer());
- }
- idx++;
- }
- break;
+ public void readBatchOfDictionaryEncodedVarWidth(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader dictionaryEncodedValuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedVarWidthBinaryInternal(vector, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ ((BaseVariableWidthVector) vector).setSafe(
+ idx,
+ dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytesUnsafe());
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(FieldVector vector, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedVarWidthBinaryInternal(
+ FieldVector vector,
+ int idx,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ ((BaseVariableWidthVector) vector).setSafe(idx, dict.decodeToBinary(currentValue).getBytesUnsafe());
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ ((BaseVariableWidthVector) vector).setSafe(
+ idx,
+ dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe());
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfIntLongBackedDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
+ public void readBatchOfIntLongBackedDecimals(
+ final FieldVector vector, final int numValsInVector,
+ final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder,
+ ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+ vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
+ bufferIdx++;
}
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
- BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
- bufferIdx++;
- }
- } else {
- setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
- bufferIdx += n;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
- valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
- bufferIdx++;
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- } else {
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
- }
- }
- break;
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+ vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+ bufferIdx++;
+ //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+ } else {
+ //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+ nullabilityHolder.setNull(bufferIdx);
+ bufferIdx++;
}
- left -= n;
- currentCount -= n;
- }
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
- final FieldVector vector, final int numValsInVector,
- final int typeWidth, final int batchSize, NullabilityHolder nullabilityHolder, VectorizedParquetValuesReader valuesReader, Dictionary dict) {
- int idx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
+ public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
+ final FieldVector vector,
+ final int numValsInVector,
+ final int typeWidth,
+ final int batchSize,
+ NullabilityHolder nullabilityHolder,
+ VectorizedParquetValuesReader valuesReader,
+ Dictionary dict) {
+ int idx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
+ } else {
+ setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
+ }
+ idx += n;
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ ((DecimalVector) vector).set(
+ idx,
+ (typeWidth == Integer.BYTES
+ ? dict.decodeToInt(valuesReader.readInteger())
+ : dict.decodeToLong(valuesReader.readInteger())));
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- valuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(vector, typeWidth, idx, n, dict);
- } else {
- setNulls(nullabilityHolder, idx, n, vector.getValidityBuffer());
- }
- idx += n;
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(valuesReader.readInteger()) : dict.decodeToLong(valuesReader.readInteger())));
- } else {
- setNull(nullabilityHolder, idx, vector.getValidityBuffer());
- }
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(FieldVector vector, final int typeWidth, int idx, int numValuesToRead, Dictionary dict) {
- int left = numValuesToRead;
- while (left > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- for (int i = 0; i < n; i++) {
- ((DecimalVector) vector).set(idx, typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
- idx++;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; i++) {
- ((DecimalVector) vector).set(idx, (typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++])));
- idx++;
- }
- break;
- }
- left -= n;
- currentCount -= n;
- }
+ private void readBatchOfDictionaryEncodedIntLongBackedDecimalsInternal(
+ FieldVector vector,
+ final int typeWidth,
+ int idx,
+ int numValuesToRead,
+ Dictionary dict) {
+ int left = numValuesToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < n; i++) {
+ ((DecimalVector) vector).set(
+ idx,
+ typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; i++) {
+ ((DecimalVector) vector).set(
+ idx,
+ (typeWidth == Integer.BYTES
+ ? dict.decodeToInt(currentValue)
+ : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++])));
+ idx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- public void readBatchOfBooleans(
- final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder, BytesReader valuesReader) {
- int bufferIdx = numValsInVector;
- int left = batchSize;
- while (left > 0) {
- if (this.currentCount == 0) {
- this.readNextGroup();
+ public void readBatchOfBooleans(
+ final FieldVector vector, final int numValsInVector, final int batchSize, NullabilityHolder nullabilityHolder
+ , ValuesAsBytesReader valuesReader) {
+ int bufferIdx = numValsInVector;
+ int left = batchSize;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+ bufferIdx++;
}
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
- bufferIdx++;
- }
- } else {
- setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
- bufferIdx += n;
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
- ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
- } else {
- setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
- }
- bufferIdx++;
- }
- break;
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, n, vector.getValidityBuffer());
+ bufferIdx += n;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ ((BitVector) vector).setSafe(bufferIdx, ((valuesReader.readBoolean() == false) ? 0 : 1));
+ } else {
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
- left -= n;
- currentCount -= n;
- }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
}
+ }
- private void setBinaryInVector(VarBinaryVector vector, int typeWidth, BytesReader valuesReader, int bufferIdx) {
- byte[] byteArray = new byte[typeWidth];
- valuesReader.getBuffer(typeWidth).get(byteArray);
- vector.setSafe(bufferIdx, byteArray);
- }
+ private void setBinaryInVector(
+ VarBinaryVector vector,
+ int typeWidth,
+ ValuesAsBytesReader valuesReader,
+ int bufferIdx) {
+ byte[] byteArray = new byte[typeWidth];
+ valuesReader.getBuffer(typeWidth).get(byteArray);
+ vector.setSafe(bufferIdx, byteArray);
+ }
- private void setNextNValuesInVector(
- int typeWidth, int maxDefLevel, NullabilityHolder nullabilityHolder,
- BytesReader valuesReader, int bufferIdx, FieldVector vector, int n) {
- ArrowBuf validityBuffer = vector.getValidityBuffer();
- int validityBufferIdx = bufferIdx;
- if (currentValue == maxDefLevel) {
- for (int i = 0; i < n; i++) {
- BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- validityBufferIdx++;
- }
- ByteBuffer buffer = valuesReader.getBuffer(n * typeWidth);
- vector.getDataBuffer().setBytes(bufferIdx * typeWidth, buffer);
- } else {
- setNulls(nullabilityHolder, bufferIdx, n, validityBuffer);
- }
+ private void setNextNValuesInVector(
+ int typeWidth, NullabilityHolder nullabilityHolder,
+ ValuesAsBytesReader valuesReader, int bufferIdx, FieldVector vector, int numValues) {
+ ArrowBuf validityBuffer = vector.getValidityBuffer();
+ int validityBufferIdx = bufferIdx;
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < numValues; i++) {
+ BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+ validityBufferIdx++;
+ }
+ ByteBuffer buffer = valuesReader.getBuffer(numValues * typeWidth);
+ vector.getDataBuffer().setBytes(bufferIdx * typeWidth, buffer);
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, numValues, validityBuffer);
}
+ }
- private void setNull(NullabilityHolder nullabilityHolder, int bufferIdx, ArrowBuf validityBuffer) {
- nullabilityHolder.setNull(bufferIdx);
- BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
- }
+ private void setNull(NullabilityHolder nullabilityHolder, int bufferIdx, ArrowBuf validityBuffer) {
+ nullabilityHolder.setNull(bufferIdx);
+ BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+ }
- private void setNulls(NullabilityHolder nullabilityHolder, int bufferIdx, int n, ArrowBuf validityBuffer) {
- for (int i = 0; i < n; i++) {
- nullabilityHolder.setNull(bufferIdx);
- BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
- bufferIdx++;
- }
+ private void setNulls(NullabilityHolder nullabilityHolder, int idx, int numValues, ArrowBuf validityBuffer) {
+ int bufferIdx = idx;
+ for (int i = 0; i < numValues; i++) {
+ nullabilityHolder.setNull(bufferIdx);
+ BitVectorHelper.setValidityBit(validityBuffer, bufferIdx, 0);
+ bufferIdx++;
}
-
-}
+ }
+}
\ No newline at end of file
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 35bb5c7..4962ee8 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -19,6 +19,10 @@
package org.apache.iceberg.spark.data;
+import static org.apache.iceberg.spark.SparkSchemaUtil.convert;
+import static scala.collection.JavaConverters.mapAsJavaMapConverter;
+import static scala.collection.JavaConverters.seqAsJavaListConverter;
+
import com.google.common.collect.Lists;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -33,12 +37,11 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.parquet.org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
+import org.apache.iceberg.parquet.arrow.IcebergArrowColumnVector;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.serde2.io.DateWritable;
@@ -62,10 +65,6 @@ import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
import scala.collection.Seq;
-import static org.apache.iceberg.spark.SparkSchemaUtil.convert;
-import static scala.collection.JavaConverters.mapAsJavaMapConverter;
-import static scala.collection.JavaConverters.seqAsJavaListConverter;
-
@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
public class TestHelpers {
diff --git a/versions.props b/versions.props
index e418732..40c0d07 100644
--- a/versions.props
+++ b/versions.props
@@ -10,6 +10,7 @@ org.apache.spark:spark-hive_2.11 = 2.4.0
org.apache.pig:pig = 0.14.0
com.fasterxml.jackson.*:* = 2.7.9
com.github.ben-manes.caffeine:caffeine = 2.7.0
+org.apache.arrow:arrow-vector = 0.14.1
# test deps
junit:junit = 4.12