You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ry...@apache.org on 2021/04/29 17:36:07 UTC
[iceberg] branch master updated: Add Arrow vectorized reader (#2286)
This is an automated email from the ASF dual-hosted git repository.
rymurr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 657de68 Add Arrow vectorized reader (#2286)
657de68 is described below
commit 657de686a1e74e9b5b77fb81e23192cfeecab057
Author: mayursrivastava <ma...@twosigma.com>
AuthorDate: Thu Apr 29 13:35:50 2021 -0400
Add Arrow vectorized reader (#2286)
Co-authored-by: Mayur Srivastava <ma...@gmail.com>
---
.../iceberg/arrow/vectorized/ArrowBatchReader.java | 104 +++
.../iceberg/arrow/vectorized/ArrowReader.java | 325 +++++++
.../arrow}/vectorized/ArrowVectorAccessor.java | 69 +-
.../arrow/vectorized/ArrowVectorAccessors.java | 70 ++
.../iceberg/arrow/vectorized/ColumnVector.java | 116 +++
.../iceberg/arrow/vectorized/ColumnarBatch.java | 88 ++
.../GenericArrowVectorAccessorFactory.java | 695 +++++++++++++++
.../arrow/vectorized/VectorizedArrowReader.java | 7 +-
.../arrow/vectorized/VectorizedReaderBuilder.java | 130 +++
.../vectorized/VectorizedTableScanIterable.java | 73 ++
.../iceberg/arrow/vectorized/ArrowReaderTest.java | 935 +++++++++++++++++++++
build.gradle | 10 +-
.../vectorized/ArrowVectorAccessorFactory.java | 110 +++
.../data/vectorized/ArrowVectorAccessors.java | 496 +----------
.../data/vectorized/IcebergArrowColumnVector.java | 5 +-
.../vectorized/VectorizedSparkParquetReaders.java | 107 +--
16 files changed, 2711 insertions(+), 629 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java
new file mode 100644
index 0000000..9eaed19
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * A collection of vectorized readers per column (in the expected read schema) and Arrow Vector holders. This class owns
+ * the Arrow vectors and is responsible for closing the Arrow vectors.
+ */
+class ArrowBatchReader implements VectorizedReader<ColumnarBatch> {
+
+ private final VectorizedArrowReader[] readers;
+ private final VectorHolder[] vectorHolders;
+
+ ArrowBatchReader(List<VectorizedReader<?>> readers) {
+ this.readers = readers.stream()
+ .map(VectorizedArrowReader.class::cast)
+ .toArray(VectorizedArrowReader[]::new);
+ this.vectorHolders = new VectorHolder[readers.size()];
+ }
+
+ @Override
+ public final void setRowGroupInfo(
+ PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
+ for (VectorizedArrowReader reader : readers) {
+ reader.setRowGroupInfo(pageStore, metaData, rowPosition);
+ }
+ }
+
+ @Override
+ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
+ Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
+
+ if (reuse == null) {
+ closeVectors();
+ }
+
+ ColumnVector[] columnVectors = new ColumnVector[readers.length];
+ for (int i = 0; i < readers.length; i += 1) {
+ vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
+ int numRowsInVector = vectorHolders[i].numValues();
+ Preconditions.checkState(
+ numRowsInVector == numRowsToRead,
+ "Number of rows in the vector %s didn't match expected %s ", numRowsInVector,
+ numRowsToRead);
+ // Handle null vector for constant case
+ columnVectors[i] = new ColumnVector(vectorHolders[i]);
+ }
+ return new ColumnarBatch(numRowsToRead, columnVectors);
+ }
+
+ private void closeVectors() {
+ for (int i = 0; i < vectorHolders.length; i++) {
+ if (vectorHolders[i] != null) {
+ // Release any resources used by the vector
+ if (vectorHolders[i].vector() != null) {
+ vectorHolders[i].vector().close();
+ }
+ vectorHolders[i] = null;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ for (VectorizedReader<?> reader : readers) {
+ reader.close();
+ }
+ closeVectors();
+ }
+
+ @Override
+ public void setBatchSize(int batchSize) {
+ for (VectorizedArrowReader reader : readers) {
+ if (reader != null) {
+ reader.setBatchSize(batchSize);
+ }
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
new file mode 100644
index 0000000..60b79a6
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ * <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link MinorType#BIT}</li>
+ * <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link MinorType#INT}</li>
+ * <li>Iceberg: {@link Types.LongType}, Arrow: {@link MinorType#BIGINT}</li>
+ * <li>Iceberg: {@link Types.FloatType}, Arrow: {@link MinorType#FLOAT4}</li>
+ * <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link MinorType#FLOAT8}</li>
+ * <li>Iceberg: {@link Types.StringType}, Arrow: {@link MinorType#VARCHAR}</li>
+ * <li>Iceberg: {@link Types.TimestampType} (both with and without timezone),
+ * Arrow: {@link MinorType#TIMEMICRO}</li>
+ * <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link MinorType#VARBINARY}</li>
+ * <li>Iceberg: {@link Types.DateType}, Arrow: {@link MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ * <li>Type promotion: In case of type promotion, the Arrow vector corresponding to
+ * the data type in the parquet file is returned instead of the data type in the latest schema.
+ * See https://github.com/apache/iceberg/issues/2483.</li>
+ * <li>Columns with constant values are physically encoded as a dictionary. The Arrow vector
+ * type is int32 instead of the type as per the schema.
+ * See https://github.com/apache/iceberg/issues/2484.</li>
+ * <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link Types.MapType},
+ * {@link Types.StructType}, {@link Types.UUIDType}, {@link Types.FixedType} and
+ * {@link Types.DecimalType}
+ * See https://github.com/apache/iceberg/issues/2485 and https://github.com/apache/iceberg/issues/2486.</li>
+ * <li>Iceberg v2 spec is not supported.
+ * See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+ private final Schema schema;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final int batchSize;
+ private final boolean reuseContainers;
+
+ /**
+ * Create a new instance of the reader.
+ *
+ * @param scan the table scan object.
+ * @param batchSize the maximum number of rows per Arrow batch.
+ * @param reuseContainers whether to reuse Arrow vectors when iterating through the data.
+ * If set to {@code false}, every {@link Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in the previous
+ * {@link Iterator#next()} may be reused for the data returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and again.
+ * Irrespective of the value of {@code reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call are closed before creating
+ * new instances if the current {@link Iterator#next()}.
+ */
+ public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+ this.schema = scan.schema();
+ this.io = scan.table().io();
+ this.encryption = scan.table().encryption();
+ this.batchSize = batchSize;
+ // start planning tasks in the background
+ this.reuseContainers = reuseContainers;
+ }
+
+ /**
+ * Returns a new iterator of {@link ColumnarBatch} objects.
+ * <p>
+ * Note that the reader owns the {@link ColumnarBatch} objects and takes care of closing them.
+ * The caller should not hold onto a {@link ColumnarBatch} or try to close them.
+ *
+ * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} are closed before returning the next {@link ColumnarBatch} object.
+ * This implies that the caller should either use the {@link ColumnarBatch} or transfer the ownership of
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ *
+ * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} may be reused for the next {@link ColumnarBatch}.
+ * This implies that the caller should either use the {@link ColumnarBatch} or deep copy the
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ */
+ public CloseableIterator<ColumnarBatch> open(CloseableIterable<CombinedScanTask> tasks) {
+ CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+ tasks,
+ schema,
+ null,
+ io,
+ encryption,
+ true,
+ batchSize,
+ reuseContainers
+ );
+ addCloseable(itr);
+ return itr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close(); // close data files
+ }
+
+ /**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+ * Only Parquet data file format is supported.
+ */
+ private static final class VectorizedCombinedScanIterator implements CloseableIterator<ColumnarBatch> {
+
+ private final List<FileScanTask> fileTasks;
+ private final Iterator<FileScanTask> fileItr;
+ private final Map<String, InputFile> inputFiles;
+ private final Schema expectedSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
+ private final int batchSize;
+ private final boolean reuseContainers;
+ private CloseableIterator<ColumnarBatch> currentIterator;
+ private ColumnarBatch current;
+ private FileScanTask currentTask;
+
+ /**
+ * Create a new instance.
+ *
+ * @param tasks Combined file scan tasks.
+ * @param expectedSchema Read schema. The returned data will have this schema.
+ * @param nameMapping Mapping from external schema names to Iceberg type IDs.
+ * @param io File I/O.
+ * @param encryptionManager Encryption manager.
+ * @param caseSensitive If {@code true}, column names are case sensitive.
+ * If {@code false}, column names are not case sensitive.
+ * @param batchSize Batch size in number of rows. Each Arrow batch contains
+ * a maximum of {@code batchSize} rows.
+ * @param reuseContainers If set to {@code false}, every {@link Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in the previous
+ * {@link Iterator#next()} may be reused for the data returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and again.
+ * Irrespective of the value of {@code reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call are closed before creating
+ * new instances if the current {@link Iterator#next()}.
+ */
+ VectorizedCombinedScanIterator(
+ CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ this.fileItr = fileTasks.iterator();
+ this.inputFiles = Collections.unmodifiableMap(fileTasks.stream()
+ .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .map(file -> EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), file.keyMetadata()))
+ .map(encryptionManager::decrypt)
+ .collect(Collectors.toMap(InputFile::location, Function.identity())));
+ this.currentIterator = CloseableIterator.empty();
+ this.expectedSchema = expectedSchema;
+ this.nameMapping = nameMapping;
+ this.caseSensitive = caseSensitive;
+ this.batchSize = batchSize;
+ this.reuseContainers = reuseContainers;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ while (true) {
+ if (currentIterator.hasNext()) {
+ this.current = currentIterator.next();
+ return true;
+ } else if (fileItr.hasNext()) {
+ this.currentIterator.close();
+ this.currentTask = fileItr.next();
+ this.currentIterator = open(currentTask);
+ } else {
+ this.currentIterator.close();
+ return false;
+ }
+ }
+ } catch (IOException | RuntimeException e) {
+ if (currentTask != null && !currentTask.isDataTask()) {
+ throw new RuntimeException(
+ "Error reading file: " + getInputFile(currentTask).location() +
+ ". Reason: the current task is not a data task, i.e. it cannot read data rows. " +
+ "Ensure that the tasks passed to the constructor are data tasks. " +
+ "The file scan tasks are: " + fileTasks,
+ e);
+ } else {
+ throw new RuntimeException(
+ "An error occurred while iterating through the file scan tasks or closing the iterator," +
+ " see the stacktrace for further information. The file scan tasks are: " + fileTasks, e);
+ }
+ }
+ }
+
+ @Override
+ public ColumnarBatch next() {
+ return current;
+ }
+
+ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
+ CloseableIterable<ColumnarBatch> iter;
+ InputFile location = getInputFile(task);
+ Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
+ if (task.file().format() == FileFormat.PARQUET) {
+ Parquet.ReadBuilder builder = Parquet.read(location)
+ .project(expectedSchema)
+ .split(task.start(), task.length())
+ .createBatchedReaderFunc(fileSchema -> buildReader(expectedSchema,
+ fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED))
+ .recordsPerBatch(batchSize)
+ .filter(task.residual())
+ .caseSensitive(caseSensitive);
+
+ if (reuseContainers) {
+ builder.reuseContainers();
+ }
+ if (nameMapping != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
+
+ iter = builder.build();
+ } else {
+ throw new UnsupportedOperationException(
+ "Format: " + task.file().format() + " not supported for batched reads");
+ }
+ return iter.iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // close the current iterator
+ this.currentIterator.close();
+
+ // exhaust the task iterator
+ while (fileItr.hasNext()) {
+ fileItr.next();
+ }
+ }
+
+ private InputFile getInputFile(FileScanTask task) {
+ Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
+ return inputFiles.get(task.file().path().toString());
+ }
+
+ /**
+ * Build the {@link ArrowBatchReader} for the expected schema and file schema.
+ *
+ * @param expectedSchema Expected schema of the data returned.
+ * @param fileSchema Schema of the data file.
+ * @param setArrowValidityVector Indicates whether to set the validity vector in Arrow vectors.
+ */
+ private static ArrowBatchReader buildReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ boolean setArrowValidityVector) {
+ return (ArrowBatchReader)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new VectorizedReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector, ImmutableMap.of(), ArrowBatchReader::new));
+ }
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessor.java
similarity index 56%
rename from spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java
rename to arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessor.java
index c9c9959..c912531 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessor.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessor.java
@@ -17,79 +17,84 @@
* under the License.
*/
-package org.apache.iceberg.spark.data.vectorized;
+package org.apache.iceberg.arrow.vectorized;
import org.apache.arrow.vector.ValueVector;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.vectorized.ArrowColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarArray;
-import org.apache.spark.unsafe.types.UTF8String;
-
-@SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class ArrowVectorAccessor {
+public class ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ implements AutoCloseable {
private final ValueVector vector;
- private final ArrowColumnVector[] childColumns;
+ private final ChildVectorT[] childColumns;
- ArrowVectorAccessor(ValueVector vector) {
- this.vector = vector;
- this.childColumns = new ArrowColumnVector[0];
+ protected ArrowVectorAccessor(ValueVector vector) {
+ this(vector, null);
}
- ArrowVectorAccessor(ValueVector vector, ArrowColumnVector[] children) {
+ protected ArrowVectorAccessor(ValueVector vector, ChildVectorT[] children) {
this.vector = vector;
this.childColumns = children;
}
- final void close() {
- for (ArrowColumnVector column : childColumns) {
- // Closing an ArrowColumnVector is expected to not throw any exception
- column.close();
+ @Override
+ public void close() {
+ if (childColumns != null) {
+ for (ChildVectorT column : childColumns) {
+ try {
+ // Closing an ArrowColumnVector is expected to not throw any exception
+ column.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
vector.close();
}
- boolean getBoolean(int rowId) {
+ public boolean getBoolean(int rowId) {
throw new UnsupportedOperationException("Unsupported type: boolean");
}
- int getInt(int rowId) {
+ public int getInt(int rowId) {
throw new UnsupportedOperationException("Unsupported type: int");
}
- long getLong(int rowId) {
+ public long getLong(int rowId) {
throw new UnsupportedOperationException("Unsupported type: long");
}
- float getFloat(int rowId) {
+ public float getFloat(int rowId) {
throw new UnsupportedOperationException("Unsupported type: float");
}
- double getDouble(int rowId) {
+ public double getDouble(int rowId) {
throw new UnsupportedOperationException("Unsupported type: double");
}
- Decimal getDecimal(int rowId, int precision, int scale) {
- throw new UnsupportedOperationException("Unsupported type: decimal");
+ public byte[] getBinary(int rowId) {
+ throw new UnsupportedOperationException("Unsupported type: binary");
}
- UTF8String getUTF8String(int rowId) {
- throw new UnsupportedOperationException("Unsupported type: UTF8String");
+ public DecimalT getDecimal(int rowId, int precision, int scale) {
+ throw new UnsupportedOperationException("Unsupported type: decimal");
}
- byte[] getBinary(int rowId) {
- throw new UnsupportedOperationException("Unsupported type: binary");
+ public Utf8StringT getUTF8String(int rowId) {
+ throw new UnsupportedOperationException("Unsupported type: UTF8String");
}
- ColumnarArray getArray(int rowId) {
+ public ArrayT getArray(int rowId) {
throw new UnsupportedOperationException("Unsupported type: array");
}
- ArrowColumnVector childColumn(int pos) {
- return childColumns[pos];
+ public ChildVectorT childColumn(int pos) {
+ if (childColumns != null) {
+ return childColumns[pos];
+ } else {
+ throw new IndexOutOfBoundsException("Child columns is null hence cannot find index: " + pos);
+ }
}
- public ValueVector getVector() {
+ public final ValueVector getVector() {
return vector;
}
}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
new file mode 100644
index 0000000..ccebf33
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.nio.charset.StandardCharsets;
+import java.util.function.Supplier;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory.StringFactory;
+
+final class ArrowVectorAccessors {
+
+ private static final GenericArrowVectorAccessorFactory<?, String, ?, ?> factory;
+
+ static {
+ factory = new GenericArrowVectorAccessorFactory<>(
+ throwingSupplier("Decimal type is not supported"),
+ JavaStringFactory::new,
+ throwingSupplier("Struct type is not supported"),
+ throwingSupplier("List type is not supported")
+ );
+ }
+
+ private static <T> Supplier<T> throwingSupplier(String message) {
+ return () -> {
+ throw new UnsupportedOperationException(message);
+ };
+ }
+
+ private ArrowVectorAccessors() {
+ throw new UnsupportedOperationException(ArrowVectorAccessors.class.getName() + " cannot be instantiated.");
+ }
+
+ static ArrowVectorAccessor<?, String, ?, ?> getVectorAccessor(VectorHolder holder) {
+ return factory.getVectorAccessor(holder);
+ }
+
+ private static final class JavaStringFactory implements StringFactory<String> {
+ @Override
+ public Class<String> getGenericClass() {
+ return String.class;
+ }
+
+ @Override
+ public String ofRow(VarCharVector vector, int rowId) {
+ return ofBytes(vector.get(rowId));
+ }
+
+ @Override
+ public String ofBytes(byte[] bytes) {
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
new file mode 100644
index 0000000..e395926
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.types.Types;
+
+/**
+ * This class is inspired by Spark's {@code ColumnVector}.
+ * This class represents the column data for an Iceberg table query.
+ * It wraps an arrow {@link FieldVector} and provides simple
+ * accessors for the row values. Advanced users can access
+ * the {@link FieldVector}.
+ * <p>
+ * Supported Iceberg data types:
+ * <ul>
+ * <li>{@link Types.BooleanType}</li>
+ * <li>{@link Types.IntegerType}</li>
+ * <li>{@link Types.LongType}</li>
+ * <li>{@link Types.FloatType}</li>
+ * <li>{@link Types.DoubleType}</li>
+ * <li>{@link Types.StringType}</li>
+ * <li>{@link Types.BinaryType}</li>
+ * <li>{@link Types.TimestampType} (with and without timezone)</li>
+ * <li>{@link Types.DateType}</li>
+ * </ul>
+ */
+public class ColumnVector implements AutoCloseable {
+ private final VectorHolder vectorHolder;
+ private final ArrowVectorAccessor<?, String, ?, ?> accessor;
+ private final NullabilityHolder nullabilityHolder;
+
+ ColumnVector(VectorHolder vectorHolder) {
+ this.vectorHolder = vectorHolder;
+ this.nullabilityHolder = vectorHolder.nullabilityHolder();
+ this.accessor = getVectorAccessor(vectorHolder);
+ }
+
+ public FieldVector getFieldVector() {
+ // TODO Convert dictionary encoded vectors to correctly typed arrow vector.
+ // e.g. convert long dictionary encoded vector to a BigIntVector.
+ return vectorHolder.vector();
+ }
+
+ public boolean hasNull() {
+ return nullabilityHolder.hasNulls();
+ }
+
+ public int numNulls() {
+ return nullabilityHolder.numNulls();
+ }
+
+ @Override
+ public void close() {
+ accessor.close();
+ }
+
+ public boolean isNullAt(int rowId) {
+ return nullabilityHolder.isNullAt(rowId) == 1;
+ }
+
+ public boolean getBoolean(int rowId) {
+ return accessor.getBoolean(rowId);
+ }
+
+ public int getInt(int rowId) {
+ return accessor.getInt(rowId);
+ }
+
+ public long getLong(int rowId) {
+ return accessor.getLong(rowId);
+ }
+
+ public float getFloat(int rowId) {
+ return accessor.getFloat(rowId);
+ }
+
+ public double getDouble(int rowId) {
+ return accessor.getDouble(rowId);
+ }
+
+ public String getString(int rowId) {
+ if (isNullAt(rowId)) {
+ return null;
+ }
+ return accessor.getUTF8String(rowId);
+ }
+
+ public byte[] getBinary(int rowId) {
+ if (isNullAt(rowId)) {
+ return null;
+ }
+ return accessor.getBinary(rowId);
+ }
+
+ private static ArrowVectorAccessor<?, String, ?, ?> getVectorAccessor(VectorHolder holder) {
+ return ArrowVectorAccessors.getVectorAccessor(holder);
+ }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnarBatch.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnarBatch.java
new file mode 100644
index 0000000..976447e
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnarBatch.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.Arrays;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This class is inspired by Spark's {@code ColumnarBatch}.
+ * This class wraps a columnar batch in the result set of an Iceberg table query.
+ */
+public class ColumnarBatch implements AutoCloseable {
+
+ private final int numRows;
+ private final ColumnVector[] columns;
+
+ ColumnarBatch(int numRows, ColumnVector[] columns) {
+ for (int i = 0; i < columns.length; i++) {
+ int columnValueCount = columns[i].getFieldVector().getValueCount();
+ Preconditions.checkArgument(numRows == columnValueCount,
+ "Number of rows (=" + numRows + ") != column[" + i + "] size (=" + columnValueCount + ")");
+ }
+ this.numRows = numRows;
+ this.columns = columns;
+ }
+
+ /**
+ * Create a new instance of {@link VectorSchemaRoot}
+ * from the arrow vectors stored in this arrow batch.
+ * The arrow vectors are owned by the reader.
+ */
+ public VectorSchemaRoot createVectorSchemaRootFromVectors() {
+ return VectorSchemaRoot.of(Arrays.stream(columns)
+ .map(ColumnVector::getFieldVector)
+ .toArray(FieldVector[]::new));
+ }
+
+ /**
+ * Called to close all the columns in this batch. It is not valid to access the data after calling this. This must be
+ * called at the end to clean up memory allocations.
+ */
+ @Override
+ public void close() {
+ for (ColumnVector c : columns) {
+ c.close();
+ }
+ }
+
+ /**
+ * Returns the number of columns that make up this batch.
+ */
+ public int numCols() {
+ return columns.length;
+ }
+
+ /**
+ * Returns the number of rows for read, including filtered rows.
+ */
+ public int numRows() {
+ return numRows;
+ }
+
+ /**
+ * Returns the column at `ordinal`.
+ */
+ public ColumnVector column(int ordinal) {
+ return columns[ordinal];
+ }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
new file mode 100644
index 0000000..df4e216
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * This class is creates typed {@link ArrowVectorAccessor} from {@link VectorHolder}.
+ * It provides a generic implementation for following Arrow types:
+ * <ul>
+ * <li>Decimal type can be deserialized to a type that supports decimal,
+ * e.g. BigDecimal or Spark's Decimal.</li>
+ * <li>UTF8 String type can deserialized to a Java String or Spark's UTF8String.</li>
+ * <li>List type: the child elements of a list can be deserialized to Spark's ColumnarArray or similar type.</li>
+ * <li>Struct type: the child elements of a struct can be deserialized to a Spark's ArrowColumnVector
+ * or similar type.</li>
+ * </ul>
+ * @param <DecimalT> A concrete type that can represent a decimal.
+ * @param <Utf8StringT> A concrete type that can represent a UTF8 string.
+ * @param <ArrayT> A concrete type that can represent an array value in a list vector, e.g. Spark's ColumnarArray.
+ * @param <ChildVectorT> A concrete type that can represent a child vector in a struct, e.g. Spark's ArrowColumnVector.
+ */
+public class GenericArrowVectorAccessorFactory<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable> {
+
+ private final Supplier<DecimalFactory<DecimalT>> decimalFactorySupplier;
+ private final Supplier<StringFactory<Utf8StringT>> stringFactorySupplier;
+ private final Supplier<StructChildFactory<ChildVectorT>> structChildFactorySupplier;
+ private final Supplier<ArrayFactory<ChildVectorT, ArrayT>> arrayFactorySupplier;
+
+ /**
+ * The constructor is parameterized using the decimal, string, struct and array factories.
+ * If a specific type is not supported, the factory supplier can raise an
+ * {@link UnsupportedOperationException}.
+ */
+ protected GenericArrowVectorAccessorFactory(
+ Supplier<DecimalFactory<DecimalT>> decimalFactorySupplier,
+ Supplier<StringFactory<Utf8StringT>> stringFactorySupplier,
+ Supplier<StructChildFactory<ChildVectorT>> structChildFactorySupplier,
+ Supplier<ArrayFactory<ChildVectorT, ArrayT>> arrayFactorySupplier) {
+ this.decimalFactorySupplier = decimalFactorySupplier;
+ this.stringFactorySupplier = stringFactorySupplier;
+ this.structChildFactorySupplier = structChildFactorySupplier;
+ this.arrayFactorySupplier = arrayFactorySupplier;
+ }
+
+ public ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getVectorAccessor(VectorHolder holder) {
+ Dictionary dictionary = holder.dictionary();
+ boolean isVectorDictEncoded = holder.isDictionaryEncoded();
+ FieldVector vector = holder.vector();
+ if (isVectorDictEncoded) {
+ ColumnDescriptor desc = holder.descriptor();
+ PrimitiveType primitive = desc.getPrimitiveType();
+ return getDictionaryVectorAccessor(dictionary, desc, vector, primitive);
+ } else {
+ return getPlainVectorAccessor(vector);
+ }
+ }
+
+ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getDictionaryVectorAccessor(
+ Dictionary dictionary,
+ ColumnDescriptor desc,
+ FieldVector vector, PrimitiveType primitive) {
+ Preconditions.checkState(vector instanceof IntVector, "Dictionary ids should be stored in IntVectors only");
+ if (primitive.getOriginalType() != null) {
+ switch (desc.getPrimitiveType().getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ case BSON:
+ return new DictionaryStringAccessor<>((IntVector) vector, dictionary, stringFactorySupplier.get());
+ case INT_64:
+ case TIMESTAMP_MILLIS:
+ case TIMESTAMP_MICROS:
+ return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+ case DECIMAL:
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return new DictionaryDecimalBinaryAccessor<>(
+ (IntVector) vector,
+ dictionary,
+ decimalFactorySupplier.get());
+ case INT64:
+ return new DictionaryDecimalLongAccessor<>(
+ (IntVector) vector,
+ dictionary,
+ decimalFactorySupplier.get());
+ case INT32:
+ return new DictionaryDecimalIntAccessor<>(
+ (IntVector) vector,
+ dictionary,
+ decimalFactorySupplier.get());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
+ }
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ } else {
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return new DictionaryBinaryAccessor<>((IntVector) vector, dictionary);
+ case FLOAT:
+ return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
+ case INT64:
+ return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+ case DOUBLE:
+ return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + primitive);
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
+ getPlainVectorAccessor(FieldVector vector) {
+ if (vector instanceof BitVector) {
+ return new BooleanAccessor<>((BitVector) vector);
+ } else if (vector instanceof IntVector) {
+ return new IntAccessor<>((IntVector) vector);
+ } else if (vector instanceof BigIntVector) {
+ return new LongAccessor<>((BigIntVector) vector);
+ } else if (vector instanceof Float4Vector) {
+ return new FloatAccessor<>((Float4Vector) vector);
+ } else if (vector instanceof Float8Vector) {
+ return new DoubleAccessor<>((Float8Vector) vector);
+ } else if (vector instanceof DecimalVector) {
+ return new DecimalAccessor<>((DecimalVector) vector, decimalFactorySupplier.get());
+ } else if (vector instanceof VarCharVector) {
+ return new StringAccessor<>((VarCharVector) vector, stringFactorySupplier.get());
+ } else if (vector instanceof VarBinaryVector) {
+ return new BinaryAccessor<>((VarBinaryVector) vector);
+ } else if (vector instanceof DateDayVector) {
+ return new DateAccessor<>((DateDayVector) vector);
+ } else if (vector instanceof TimeStampMicroTZVector) {
+ return new TimestampMicroTzAccessor<>((TimeStampMicroTZVector) vector);
+ } else if (vector instanceof TimeStampMicroVector) {
+ return new TimestampMicroAccessor<>((TimeStampMicroVector) vector);
+ } else if (vector instanceof ListVector) {
+ ListVector listVector = (ListVector) vector;
+ return new ArrayAccessor<>(listVector, arrayFactorySupplier.get());
+ } else if (vector instanceof StructVector) {
+ StructVector structVector = (StructVector) vector;
+ return new StructAccessor<>(structVector, structChildFactorySupplier.get());
+ }
+ throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
+ }
+
+ private static class BooleanAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final BitVector vector;
+
+ BooleanAccessor(BitVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final boolean getBoolean(int rowId) {
+ return vector.get(rowId) == 1;
+ }
+ }
+
+ private static class IntAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final IntVector vector;
+
+ IntAccessor(IntVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final int getInt(int rowId) {
+ return vector.get(rowId);
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return getInt(rowId);
+ }
+ }
+
+ private static class LongAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final BigIntVector vector;
+
+ LongAccessor(BigIntVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class DictionaryLongAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final IntVector offsetVector;
+ private final long[] decodedDictionary;
+
+ DictionaryLongAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToLong(dictionary::decodeToLong)
+ .toArray();
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return decodedDictionary[offsetVector.get(rowId)];
+ }
+ }
+
+ private static class FloatAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final Float4Vector vector;
+
+ FloatAccessor(Float4Vector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final float getFloat(int rowId) {
+ return vector.get(rowId);
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return getFloat(rowId);
+ }
+ }
+
+ private static class DictionaryFloatAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final IntVector offsetVector;
+ private final float[] decodedDictionary;
+
+ DictionaryFloatAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = new float[dictionary.getMaxId() + 1];
+ for (int i = 0; i <= dictionary.getMaxId(); i++) {
+ decodedDictionary[i] = dictionary.decodeToFloat(i);
+ }
+ }
+
+ @Override
+ public final float getFloat(int rowId) {
+ return decodedDictionary[offsetVector.get(rowId)];
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return getFloat(rowId);
+ }
+ }
+
+ private static class DoubleAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final Float8Vector vector;
+
+ DoubleAccessor(Float8Vector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class DictionaryDoubleAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final IntVector offsetVector;
+ private final double[] decodedDictionary;
+
+ DictionaryDoubleAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToDouble(dictionary::decodeToDouble)
+ .toArray();
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return decodedDictionary[offsetVector.get(rowId)];
+ }
+ }
+
+ private static class StringAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final VarCharVector vector;
+ private final StringFactory<Utf8StringT> stringFactory;
+
+ StringAccessor(VarCharVector vector, StringFactory<Utf8StringT> stringFactory) {
+ super(vector);
+ this.vector = vector;
+ this.stringFactory = stringFactory;
+ }
+
+ @Override
+ public final Utf8StringT getUTF8String(int rowId) {
+ return stringFactory.ofRow(vector, rowId);
+ }
+ }
+
+ private static class DictionaryStringAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final Utf8StringT[] decodedDictionary;
+ private final IntVector offsetVector;
+
+ DictionaryStringAccessor(IntVector vector, Dictionary dictionary, StringFactory<Utf8StringT> stringFactory) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToObj(dictionary::decodeToBinary)
+ .map(binary -> stringFactory.ofBytes(binary.getBytes()))
+ .toArray(genericArray(stringFactory.getGenericClass()));
+ }
+
+ @Override
+ public final Utf8StringT getUTF8String(int rowId) {
+ int offset = offsetVector.get(rowId);
+ return decodedDictionary[offset];
+ }
+ }
+
+ private static class BinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final VarBinaryVector vector;
+
+ BinaryAccessor(VarBinaryVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final byte[] getBinary(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class DictionaryBinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final IntVector offsetVector;
+ private final byte[][] decodedDictionary;
+
+ DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToObj(dictionary::decodeToBinary)
+ .map(Binary::getBytes)
+ .toArray(byte[][]::new);
+ }
+
+ @Override
+ public final byte[] getBinary(int rowId) {
+ int offset = offsetVector.get(rowId);
+ return decodedDictionary[offset];
+ }
+ }
+
+ private static class DateAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final DateDayVector vector;
+
+ DateAccessor(DateDayVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final int getInt(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class TimestampMicroTzAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final TimeStampMicroTZVector vector;
+
+ TimestampMicroTzAccessor(TimeStampMicroTZVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class TimestampMicroAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final TimeStampMicroVector vector;
+
+ TimestampMicroAccessor(TimeStampMicroVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class ArrayAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final ListVector vector;
+ private final ChildVectorT arrayData;
+ private final ArrayFactory<ChildVectorT, ArrayT> arrayFactory;
+
+ ArrayAccessor(ListVector vector, ArrayFactory<ChildVectorT, ArrayT> arrayFactory) {
+ super(vector);
+ this.vector = vector;
+ this.arrayFactory = arrayFactory;
+ this.arrayData = arrayFactory.ofChild(vector.getDataVector());
+ }
+
+ @Override
+ public final ArrayT getArray(int rowId) {
+ return arrayFactory.ofRow(vector, arrayData, rowId);
+ }
+ }
+
+ private static class StructAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ StructAccessor(StructVector structVector, StructChildFactory<ChildVectorT> structChildFactory) {
+ super(structVector, IntStream.range(0, structVector.size())
+ .mapToObj(structVector::getVectorById)
+ .map(structChildFactory::of)
+ .toArray(genericArray(structChildFactory.getGenericClass())));
+ }
+ }
+
+ private static class DecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ private final DecimalVector vector;
+ private final DecimalFactory<DecimalT> decimalFactory;
+
+ DecimalAccessor(DecimalVector vector, DecimalFactory<DecimalT> decimalFactory) {
+ super(vector);
+ this.vector = vector;
+ this.decimalFactory = decimalFactory;
+ }
+
+ @Override
+ public final DecimalT getDecimal(int rowId, int precision, int scale) {
+ return decimalFactory.ofBigDecimal(
+ DecimalUtility.getBigDecimalFromArrowBuf(vector.getDataBuffer(), rowId, scale),
+ precision, scale);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ private abstract static class
+ DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final DecimalT[] cache;
+ private final DecimalFactory<DecimalT> decimalFactory;
+ private final Dictionary parquetDictionary;
+ private final IntVector offsetVector;
+
+ private DictionaryDecimalAccessor(
+ IntVector vector,
+ Dictionary dictionary,
+ DecimalFactory<DecimalT> decimalFactory) {
+ super(vector);
+ this.offsetVector = vector;
+ this.parquetDictionary = dictionary;
+ this.decimalFactory = decimalFactory;
+ this.cache = genericArray(decimalFactory.getGenericClass(), dictionary.getMaxId() + 1);
+ }
+
+ protected long decodeToBinary(int dictId) {
+ return new BigInteger(parquetDictionary.decodeToBinary(dictId).getBytes()).longValue();
+ }
+
+ protected long decodeToLong(int dictId) {
+ return parquetDictionary.decodeToLong(dictId);
+ }
+
+ protected int decodeToInt(int dictId) {
+ return parquetDictionary.decodeToInt(dictId);
+ }
+
+ @Override
+ public final DecimalT getDecimal(int rowId, int precision, int scale) {
+ int dictId = offsetVector.get(rowId);
+ if (cache[dictId] == null) {
+ cache[dictId] = decimalFactory.ofLong(
+ decode(dictId),
+ precision,
+ scale);
+ }
+ return cache[dictId];
+ }
+
+ protected abstract long decode(int dictId);
+ }
+
+ private static class
+ DictionaryDecimalBinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ DictionaryDecimalBinaryAccessor(IntVector vector, Dictionary dictionary, DecimalFactory<DecimalT> decimalFactory) {
+ super(vector, dictionary, decimalFactory);
+ }
+
+ @Override
+ protected long decode(int dictId) {
+ return decodeToBinary(dictId);
+ }
+ }
+
+ private static class DictionaryDecimalLongAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ DictionaryDecimalLongAccessor(IntVector vector, Dictionary dictionary, DecimalFactory<DecimalT> decimalFactory) {
+ super(vector, dictionary, decimalFactory);
+ }
+
+ @Override
+ protected long decode(int dictId) {
+ return decodeToLong(dictId);
+ }
+ }
+
+ private static class DictionaryDecimalIntAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+
+ DictionaryDecimalIntAccessor(IntVector vector, Dictionary dictionary, DecimalFactory<DecimalT> decimalFactory) {
+ super(vector, dictionary, decimalFactory);
+ }
+
+ @Override
+ protected long decode(int dictId) {
+ return decodeToInt(dictId);
+ }
+ }
+
+ /**
+ * Create a decimal value of type {@code DecimalT} from arrow vector value.
+ * @param <DecimalT> A concrete type that can represent a decimal, e.g, Spark's Decimal.
+ */
+ protected interface DecimalFactory<DecimalT> {
+ /**
+ * Class of concrete decimal type.
+ */
+ Class<DecimalT> getGenericClass();
+
+ /**
+ * Create a decimal from the given long value, precision and scale.
+ */
+ DecimalT ofLong(long value, int precision, int scale);
+
+ /**
+ * Create a decimal from the given {@link BigDecimal} value, precision and scale.
+ */
+ DecimalT ofBigDecimal(BigDecimal value, int precision, int scale);
+ }
+
+ /**
+ * Create a UTF8 String value of type {@code Utf8StringT} from arrow vector value.
+ * @param <Utf8StringT> A concrete type that can represent a UTF8 string.
+ */
+ protected interface StringFactory<Utf8StringT> {
+ /**
+ * Class of concrete UTF8 String type.
+ */
+ Class<Utf8StringT> getGenericClass();
+
+ /**
+ * Create a UTF8 String from the row value in the arrow vector.
+ */
+ Utf8StringT ofRow(VarCharVector vector, int rowId);
+
+ /**
+ * Create a UTF8 String from the byte array.
+ */
+ Utf8StringT ofBytes(byte[] bytes);
+ }
+
+ /**
+ * Create an array value of type {@code ArrayT} from arrow vector value.
+ * @param <ArrayT> A concrete type that can represent an array value in a list vector,
+ * e.g. Spark's ColumnarArray.
+ * @param <ChildVectorT> A concrete type that can represent a child vector in a struct,
+ * e.g. Spark's ArrowColumnVector.
+ */
+ protected interface ArrayFactory<ChildVectorT, ArrayT> {
+ /**
+ * Create a child vector of type {@code ChildVectorT} from the arrow child vector.
+ */
+ ChildVectorT ofChild(ValueVector childVector);
+
+ /**
+ * Create an Arrow of type {@code ArrayT} from the row value in the arrow child vector.
+ */
+ ArrayT ofRow(ValueVector vector, ChildVectorT childData, int rowId);
+ }
+
+ /**
+ * Create a struct child vector of type {@code ChildVectorT} from arrow vector value.
+ * @param <ChildVectorT> A concrete type that can represent a child vector in a struct,
+ * e.g. Spark's ArrowColumnVector.
+ */
+ protected interface StructChildFactory<ChildVectorT> {
+ /**
+ * Class of concrete child vector type.
+ */
+ Class<ChildVectorT> getGenericClass();
+
+ /**
+ * Create the child vector of type such as Spark's ArrowColumnVector from the arrow child vector.
+ */
+ ChildVectorT of(ValueVector childVector);
+ }
+
+ private static <T> IntFunction<T[]> genericArray(Class<T> genericClass) {
+ return length -> genericArray(genericClass, length);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T[] genericArray(Class<T> genericClass, int length) {
+ return (T[]) Array.newInstance(genericClass, length);
+ }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 6c97c27..5ed323f 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -31,6 +31,7 @@ import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
@@ -231,7 +232,11 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
break;
case TIMESTAMP_MICROS:
this.vec = arrowField.createVector(rootAlloc);
- ((TimeStampMicroTZVector) vec).allocateNew(batchSize);
+ if (((Types.TimestampType) icebergField.type()).shouldAdjustToUTC()) {
+ ((TimeStampMicroTZVector) vec).allocateNew(batchSize);
+ } else {
+ ((TimeStampMicroVector) vec).allocateNew(batchSize);
+ }
this.readType = ReadType.LONG;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
new file mode 100644
index 0000000..77bd089
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.ArrowAllocation;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
+ private final MessageType parquetSchema;
+ private final Schema icebergSchema;
+ private final BufferAllocator rootAllocator;
+ private final Map<Integer, ?> idToConstant;
+ private final boolean setArrowValidityVector;
+ private final Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory;
+
+ public VectorizedReaderBuilder(
+ Schema expectedSchema,
+ MessageType parquetSchema,
+ boolean setArrowValidityVector, Map<Integer, ?> idToConstant,
+ Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory) {
+ this.parquetSchema = parquetSchema;
+ this.icebergSchema = expectedSchema;
+ this.rootAllocator = ArrowAllocation.rootAllocator()
+ .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
+ this.setArrowValidityVector = setArrowValidityVector;
+ this.idToConstant = idToConstant;
+ this.readerFactory = readerFactory;
+ }
+
+ @Override
+ public VectorizedReader<?> message(
+ Types.StructType expected, MessageType message,
+ List<VectorizedReader<?>> fieldReaders) {
+ GroupType groupType = message.asGroupType();
+ Map<Integer, VectorizedReader<?>> readersById = Maps.newHashMap();
+ List<Type> fields = groupType.getFields();
+
+ IntStream.range(0, fields.size())
+ .filter(pos -> fields.get(pos).getId() != null)
+ .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos)));
+
+ List<Types.NestedField> icebergFields = expected != null ?
+ expected.fields() : ImmutableList.of();
+
+ List<VectorizedReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
+ icebergFields.size());
+
+ for (Types.NestedField field : icebergFields) {
+ int id = field.fieldId();
+ VectorizedReader<?> reader = readersById.get(id);
+ if (idToConstant.containsKey(id)) {
+ reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(idToConstant.get(id)));
+ } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
+ reorderedFields.add(VectorizedArrowReader.positions());
+ } else if (reader != null) {
+ reorderedFields.add(reader);
+ } else {
+ reorderedFields.add(VectorizedArrowReader.nulls());
+ }
+ }
+ return readerFactory.apply(reorderedFields);
+ }
+
+ @Override
+ public VectorizedReader<?> struct(
+ Types.StructType expected, GroupType groupType,
+ List<VectorizedReader<?>> fieldReaders) {
+ if (expected != null) {
+ throw new UnsupportedOperationException("Vectorized reads are not supported yet for struct fields");
+ }
+ return null;
+ }
+
+ @Override
+ public VectorizedReader<?> primitive(
+ org.apache.iceberg.types.Type.PrimitiveType expected,
+ PrimitiveType primitive) {
+
+ // Create arrow vector for this field
+ if (primitive.getId() == null) {
+ return null;
+ }
+ int parquetFieldId = primitive.getId().intValue();
+ ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
+ // Nested types not yet supported for vectorized reads
+ if (desc.getMaxRepetitionLevel() > 0) {
+ return null;
+ }
+ Types.NestedField icebergField = icebergSchema.findField(parquetFieldId);
+ if (icebergField == null) {
+ return null;
+ }
+ // Set the validity buffer if null checking is enabled in arrow
+ return new VectorizedArrowReader(desc, icebergField, rootAllocator, setArrowValidityVector);
+ }
+}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedTableScanIterable.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedTableScanIterable.java
new file mode 100644
index 0000000..e5a81e8
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedTableScanIterable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * A vectorized implementation of the Iceberg reader that iterates over the table scan.
+ * See {@link ArrowReader} for details.
+ */
+public class VectorizedTableScanIterable extends CloseableGroup implements CloseableIterable<ColumnarBatch> {
+
+ private static final int BATCH_SIZE_IN_NUM_ROWS = 1 << 16;
+
+ private final ArrowReader reader;
+ private final CloseableIterable<CombinedScanTask> tasks;
+
+ /**
+ * Create a new instance using default values for {@code batchSize} and {@code reuseContainers}.
+ * The {@code batchSize} is set to {@link #BATCH_SIZE_IN_NUM_ROWS} and {@code reuseContainers}
+ * is set to {@code false}.
+ *
+ */
+ public VectorizedTableScanIterable(TableScan scan) {
+ this(scan, BATCH_SIZE_IN_NUM_ROWS, false);
+ }
+
+ /**
+ * Create a new instance.
+ *
+ * See {@link ArrowReader#ArrowReader(TableScan, int, boolean)} for details.
+ */
+ public VectorizedTableScanIterable(TableScan scan, int batchSize, boolean reuseContainers) {
+ this.reader = new ArrowReader(scan, batchSize, reuseContainers);
+ // start planning tasks in the background
+ this.tasks = scan.planTasks();
+ }
+
+ @Override
+ public CloseableIterator<ColumnarBatch> iterator() {
+ CloseableIterator<ColumnarBatch> iter = reader.open(tasks);
+ addCloseable(iter);
+ return iter;
+ }
+
+ @Override
+ public void close() throws IOException {
+ tasks.close(); // close manifests from scan planning
+ super.close(); // close data files
+ }
+}
diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
new file mode 100644
index 0000000..6829173
--- /dev/null
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link ArrowReader}.
+ * <p>All tests create a table with monthly partitions and write 1 year of data to the table.
+ */
+public class ArrowReaderTest {
+
+ private static final int NUM_ROWS_PER_MONTH = 20;
+ private static final ImmutableList<String> ALL_COLUMNS =
+ ImmutableList.of(
+ "timestamp",
+ "timestamp_nullable",
+ "boolean",
+ "boolean_nullable",
+ "int",
+ "int_nullable",
+ "long",
+ "long_nullable",
+ "float",
+ "float_nullable",
+ "double",
+ "double_nullable",
+ "timestamp_tz",
+ "timestamp_tz_nullable",
+ "string",
+ "string_nullable",
+ "bytes",
+ "bytes_nullable",
+ "date",
+ "date_nullable",
+ "int_promotion"
+ );
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
+ private HadoopTables tables;
+
+ private String tableLocation;
+ private List<GenericRecord> rowsWritten;
+
+ /**
+ * Read all rows and columns from the table without any filter. The test asserts that the Arrow {@link
+ * VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the vectors
+ * contains expected values. The test also asserts the total number of rows match the expected value.
+ */
+ @Test
+ public void testReadAll() throws Exception {
+ writeTableWithIncrementalRecords();
+ Table table = tables.load(tableLocation);
+ readAndCheckQueryResult(table.newScan(), NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+ }
+
+ /**
+ * This test writes each partition with constant value rows. The Arrow vectors returned are mostly of type int32
+ * which is unexpected. This is happening because of dictionary encoding at the storage level.
+ * <p>
+ * Following are the expected and actual Arrow schema:
+ * <pre>
+ * Expected Arrow Schema:
+ * timestamp: Timestamp(MICROSECOND, null) not null,
+ * timestamp_nullable: Timestamp(MICROSECOND, null),
+ * boolean: Bool not null,
+ * boolean_nullable: Bool,
+ * int: Int(32, true) not null,
+ * int_nullable: Int(32, true),
+ * long: Int(64, true) not null,
+ * long_nullable: Int(64, true),
+ * float: FloatingPoint(SINGLE) not null,
+ * float_nullable: FloatingPoint(SINGLE),
+ * double: FloatingPoint(DOUBLE) not null,
+ * double_nullable: FloatingPoint(DOUBLE),
+ * timestamp_tz: Timestamp(MICROSECOND, UTC) not null,
+ * timestamp_tz_nullable: Timestamp(MICROSECOND, UTC),
+ * string: Utf8 not null,
+ * string_nullable: Utf8,
+ * bytes: Binary not null,
+ * bytes_nullable: Binary,
+ * date: Date(DAY) not null,
+ * date_nullable: Date(DAY),
+ * int_promotion: Int(32, true) not null
+ *
+ * Actual Arrow Schema:
+ * timestamp: Int(32, true) not null,
+ * timestamp_nullable: Int(32, true),
+ * boolean: Bool not null,
+ * boolean_nullable: Bool,
+ * int: Int(32, true) not null,
+ * int_nullable: Int(32, true),
+ * long: Int(32, true) not null,
+ * long_nullable: Int(32, true),
+ * float: Int(32, true) not null,
+ * float_nullable: Int(32, true),
+ * double: Int(32, true) not null,
+ * double_nullable: Int(32, true),
+ * timestamp_tz: Int(32, true) not null,
+ * timestamp_tz_nullable: Int(32, true),
+ * string: Int(32, true) not null,
+ * string_nullable: Int(32, true),
+ * bytes: Int(32, true) not null,
+ * bytes_nullable: Int(32, true),
+ * date: Date(DAY) not null,
+ * date_nullable: Date(DAY),
+ * int_promotion: Int(32, true) not null
+ * </pre>
+ * <p>
+ * TODO: fix the returned Arrow vectors to have vector types consistent with Iceberg types.
+ * <p>
+ * Read all rows and columns from the table without any filter. The test asserts that the Arrow {@link
+ * VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the vectors
+ * contains expected values. The test also asserts the total number of rows match the expected value.
+ */
+ @Test
+ @Ignore
+ public void testReadAllWithConstantRecords() throws Exception {
+ writeTableWithConstantRecords();
+ Table table = tables.load(tableLocation);
+ readAndCheckQueryResult(table.newScan(), NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+ }
+
+ /**
+ * Read all rows and columns from the table without any filter. The test uses a batch size smaller than the number of
+ * rows in a partition. The test asserts that the Arrow {@link VectorSchemaRoot} contains the expected schema and
+ * expected vector types. Then the test asserts that the vectors contains expected values. The test also asserts the
+ * total number of rows match the expected value.
+ */
+ @Test
+ public void testReadAllWithSmallerBatchSize() throws Exception {
+ writeTableWithIncrementalRecords();
+ Table table = tables.load(tableLocation);
+ TableScan scan = table.newScan();
+ readAndCheckQueryResult(scan, 10, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+ }
+
+ /**
+ * Read selected rows and all columns from the table using a time range row filter. The test asserts that the Arrow
+ * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the
+ * vectors contains expected values. The test also asserts the total number of rows match the expected value.
+ */
+ @Test
+ public void testReadRangeFilter() throws Exception {
+ writeTableWithIncrementalRecords();
+ Table table = tables.load(tableLocation);
+ LocalDateTime beginTime = LocalDateTime.of(2020, 1, 1, 0, 0, 0);
+ LocalDateTime endTime = LocalDateTime.of(2020, 2, 1, 0, 0, 0);
+ TableScan scan = table.newScan()
+ .filter(Expressions.and(
+ Expressions.greaterThanOrEqual("timestamp", timestampToMicros(beginTime)),
+ Expressions.lessThan("timestamp", timestampToMicros(endTime))));
+ readAndCheckQueryResult(scan, NUM_ROWS_PER_MONTH, NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+ }
+
+ /**
+ * Read selected rows and all columns from the table using a time range row filter.
+ * The test asserts that the result is empty.
+ */
+ @Test
+ public void testReadRangeFilterEmptyResult() throws Exception {
+ writeTableWithIncrementalRecords();
+ Table table = tables.load(tableLocation);
+ LocalDateTime beginTime = LocalDateTime.of(2021, 1, 1, 0, 0, 0);
+ LocalDateTime endTime = LocalDateTime.of(2021, 2, 1, 0, 0, 0);
+ TableScan scan = table.newScan()
+ .filter(Expressions.and(
+ Expressions.greaterThanOrEqual("timestamp", timestampToMicros(beginTime)),
+ Expressions.lessThan("timestamp", timestampToMicros(endTime))));
+ int numRoots = 0;
+ try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, NUM_ROWS_PER_MONTH, false)) {
+ for (ColumnarBatch batch : itr) {
+ numRoots++;
+ }
+ }
+ assertEquals(0, numRoots);
+ }
+
+ /**
+ * Read all rows and selected columns from the table with a column selection filter. The test asserts that the Arrow
+ * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the
+ * vectors contains expected values. The test also asserts the total number of rows match the expected value.
+ */
+ @Test
+ public void testReadColumnFilter1() throws Exception {
+ writeTableWithIncrementalRecords();
+ Table table = tables.load(tableLocation);
+ TableScan scan = table.newScan()
+ .select("timestamp", "int", "string");
+ readAndCheckQueryResult(
+ scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+ ImmutableList.of("timestamp", "int", "string"));
+ }
+
+ /**
+ * Read all rows and a single column from the table with a column selection filter. The test asserts that the Arrow
+ * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the
+ * vectors contains expected values. The test also asserts the total number of rows match the expected value.
+ */
+ @Test
+ public void testReadColumnFilter2() throws Exception {
+ writeTableWithIncrementalRecords();
+ Table table = tables.load(tableLocation);
+ TableScan scan = table.newScan()
+ .select("timestamp");
+ readAndCheckQueryResult(
+ scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+ ImmutableList.of("timestamp"));
+ }
+
+ /**
+ * Run the following verifications:
+ * <ol>
+ * <li>Read the data and verify that the returned ColumnarBatches match expected rows.</li>
+ * <li>Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows.</li>
+ * </ol>
+ */
+ private void readAndCheckQueryResult(
+ TableScan scan,
+ int numRowsPerRoot,
+ int expectedTotalRows,
+ List<String> columns) throws IOException {
+ // Read the data and verify that the returned ColumnarBatches match expected rows.
+ readAndCheckColumnarBatch(scan, numRowsPerRoot, columns);
+ // Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows.
+ readAndCheckArrowResult(scan, numRowsPerRoot, expectedTotalRows, columns);
+ }
+
+ private void readAndCheckColumnarBatch(
+ TableScan scan,
+ int numRowsPerRoot,
+ List<String> columns) throws IOException {
+ int rowIndex = 0;
+ try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, numRowsPerRoot, false)) {
+ for (ColumnarBatch batch : itr) {
+ List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + numRowsPerRoot);
+ checkColumnarBatch(numRowsPerRoot, expectedRows, batch, columns);
+ rowIndex += numRowsPerRoot;
+ }
+ }
+ }
+
+ private void readAndCheckArrowResult(
+ TableScan scan,
+ int numRowsPerRoot,
+ int expectedTotalRows,
+ List<String> columns) throws IOException {
+ Set<String> columnSet = ImmutableSet.copyOf(columns);
+ int rowIndex = 0;
+ int totalRows = 0;
+ try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, numRowsPerRoot, false)) {
+ for (ColumnarBatch batch : itr) {
+ List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + numRowsPerRoot);
+ VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
+ assertEquals(createExpectedArrowSchema(columnSet), root.getSchema());
+ checkAllVectorTypes(root, columnSet);
+ checkAllVectorValues(numRowsPerRoot, expectedRows, root, columnSet);
+ rowIndex += numRowsPerRoot;
+ totalRows += root.getRowCount();
+ }
+ }
+ assertEquals(expectedTotalRows, totalRows);
+ }
+
+ private void checkColumnarBatch(
+ int expectedNumRows,
+ List<GenericRecord> expectedRows,
+ ColumnarBatch batch,
+ List<String> columns) {
+
+ Map<String, Integer> columnNameToIndex = new HashMap<>();
+ for (int i = 0; i < columns.size(); i++) {
+ columnNameToIndex.put(columns.get(i), i);
+ }
+ Set<String> columnSet = columnNameToIndex.keySet();
+
+ assertEquals(expectedNumRows, batch.numRows());
+ assertEquals(columns.size(), batch.numCols());
+
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp"),
+ columnSet, "timestamp",
+ (records, i) -> records.get(i).getField("timestamp"),
+ (array, i) -> timestampFromMicros(array.getLong(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp_nullable"),
+ columnSet, "timestamp_nullable",
+ (records, i) -> records.get(i).getField("timestamp_nullable"),
+ (array, i) -> timestampFromMicros(array.getLong(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("boolean"),
+ columnSet, "boolean",
+ (records, i) -> records.get(i).getField("boolean"),
+ ColumnVector::getBoolean
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("boolean_nullable"),
+ columnSet, "boolean_nullable",
+ (records, i) -> records.get(i).getField("boolean_nullable"),
+ ColumnVector::getBoolean
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("int"),
+ columnSet, "int",
+ (records, i) -> records.get(i).getField("int"),
+ ColumnVector::getInt
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("int_nullable"),
+ columnSet, "int_nullable",
+ (records, i) -> records.get(i).getField("int_nullable"),
+ ColumnVector::getInt
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("long"),
+ columnSet, "long",
+ (records, i) -> records.get(i).getField("long"),
+ ColumnVector::getLong
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("long_nullable"),
+ columnSet, "long_nullable",
+ (records, i) -> records.get(i).getField("long_nullable"),
+ ColumnVector::getLong
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("float"),
+ columnSet, "float",
+ (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float")),
+ (array, i) -> Float.floatToIntBits(array.getFloat(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("float_nullable"),
+ columnSet, "float_nullable",
+ (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float_nullable")),
+ (array, i) -> Float.floatToIntBits(array.getFloat(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("double"),
+ columnSet, "double",
+ (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double")),
+ (array, i) -> Double.doubleToLongBits(array.getDouble(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("double_nullable"),
+ columnSet, "double_nullable",
+ (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double_nullable")),
+ (array, i) -> Double.doubleToLongBits(array.getDouble(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp_tz"),
+ columnSet, "timestamp_tz",
+ (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz")),
+ ColumnVector::getLong
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("timestamp_tz_nullable"),
+ columnSet, "timestamp_tz_nullable",
+ (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz_nullable")),
+ ColumnVector::getLong
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("string"),
+ columnSet, "string",
+ (records, i) -> records.get(i).getField("string"),
+ ColumnVector::getString
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("string_nullable"),
+ columnSet, "string_nullable",
+ (records, i) -> records.get(i).getField("string_nullable"),
+ ColumnVector::getString
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("bytes"),
+ columnSet, "bytes",
+ (records, i) -> records.get(i).getField("bytes"),
+ (array, i) -> ByteBuffer.wrap(array.getBinary(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("bytes_nullable"),
+ columnSet, "bytes_nullable",
+ (records, i) -> records.get(i).getField("bytes_nullable"),
+ (array, i) -> ByteBuffer.wrap(array.getBinary(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("date"),
+ columnSet, "date",
+ (records, i) -> records.get(i).getField("date"),
+ (array, i) -> dateFromDay(array.getInt(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("date_nullable"),
+ columnSet, "date_nullable",
+ (records, i) -> records.get(i).getField("date_nullable"),
+ (array, i) -> dateFromDay(array.getInt(i))
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("int_promotion"),
+ columnSet, "int_promotion",
+ (records, i) -> records.get(i).getField("int_promotion"),
+ ColumnVector::getInt
+ );
+ }
+
+ private static void checkColumnarArrayValues(
+ int expectedNumRows,
+ List<GenericRecord> expectedRows,
+ ColumnarBatch columnBatch,
+ Integer columnIndex,
+ Set<String> columnSet,
+ String columnName,
+ BiFunction<List<GenericRecord>, Integer, Object> expectedValueExtractor,
+ BiFunction<ColumnVector, Integer, Object> vectorValueExtractor) {
+ if (columnSet.contains(columnName)) {
+ ColumnVector columnVector = columnBatch.column(columnIndex);
+ for (int i = 0; i < expectedNumRows; i++) {
+ Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
+ Object actualValue = vectorValueExtractor.apply(columnVector, i);
+ assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+ }
+ }
+ }
+
+ private void writeTableWithConstantRecords() throws Exception {
+ writeTable(true);
+ }
+
+ private void writeTableWithIncrementalRecords() throws Exception {
+ writeTable(false);
+ }
+
+ private void writeTable(boolean constantRecords) throws Exception {
+ rowsWritten = new ArrayList<>();
+ tables = new HadoopTables();
+ tableLocation = temp.newFolder("test").toString();
+
+ Schema schema = new Schema(
+ Types.NestedField.required(1, "timestamp", Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(2, "timestamp_nullable", Types.TimestampType.withoutZone()),
+ Types.NestedField.required(3, "boolean", Types.BooleanType.get()),
+ Types.NestedField.optional(4, "boolean_nullable", Types.BooleanType.get()),
+ Types.NestedField.required(5, "int", Types.IntegerType.get()),
+ Types.NestedField.optional(6, "int_nullable", Types.IntegerType.get()),
+ Types.NestedField.required(7, "long", Types.LongType.get()),
+ Types.NestedField.optional(8, "long_nullable", Types.LongType.get()),
+ Types.NestedField.required(9, "float", Types.FloatType.get()),
+ Types.NestedField.optional(10, "float_nullable", Types.FloatType.get()),
+ Types.NestedField.required(11, "double", Types.DoubleType.get()),
+ Types.NestedField.optional(12, "double_nullable", Types.DoubleType.get()),
+ Types.NestedField.required(13, "timestamp_tz", Types.TimestampType.withZone()),
+ Types.NestedField.optional(14, "timestamp_tz_nullable", Types.TimestampType.withZone()),
+ Types.NestedField.required(15, "string", Types.StringType.get()),
+ Types.NestedField.optional(16, "string_nullable", Types.StringType.get()),
+ Types.NestedField.required(17, "bytes", Types.BinaryType.get()),
+ Types.NestedField.optional(18, "bytes_nullable", Types.BinaryType.get()),
+ Types.NestedField.required(19, "date", Types.DateType.get()),
+ Types.NestedField.optional(20, "date_nullable", Types.DateType.get()),
+ Types.NestedField.required(21, "int_promotion", Types.IntegerType.get())
+ );
+
+ PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .month("timestamp")
+ .build();
+
+ Table table = tables.create(schema, spec, tableLocation);
+
+ OverwriteFiles overwrite = table.newOverwrite();
+ for (int i = 1; i <= 12; i++) {
+ final List<GenericRecord> records;
+ if (constantRecords) {
+ records = createConstantRecordsForDate(
+ table.schema(), LocalDateTime.of(2020, i, 1, 0, 0, 0)
+ );
+ } else {
+ records = createIncrementalRecordsForDate(
+ table.schema(), LocalDateTime.of(2020, i, 1, 0, 0, 0)
+ );
+ }
+ overwrite.addFile(writeParquetFile(table, records));
+ }
+ overwrite.commit();
+
+ // Perform a type promotion
+ // TODO: The read Arrow vector should of type BigInt (promoted type) but it is Int (old type).
+ Table tableLatest = tables.load(tableLocation);
+ tableLatest.updateSchema()
+ .updateColumn("int_promotion", Types.LongType.get())
+ .commit();
+ }
+
+ private static org.apache.arrow.vector.types.pojo.Schema createExpectedArrowSchema(Set<String> columnSet) {
+ List<Field> allFields = ImmutableList.of(
+ new Field(
+ "timestamp", new FieldType(false, MinorType.TIMESTAMPMICRO.getType(), null), null),
+ new Field(
+ "timestamp_nullable", new FieldType(true, MinorType.TIMESTAMPMICRO.getType(), null), null),
+ new Field(
+ "boolean", new FieldType(false, MinorType.BIT.getType(), null), null),
+ new Field(
+ "boolean_nullable", new FieldType(true, MinorType.BIT.getType(), null), null),
+ new Field(
+ "int", new FieldType(false, MinorType.INT.getType(), null), null),
+ new Field(
+ "int_nullable", new FieldType(true, MinorType.INT.getType(), null), null),
+ new Field(
+ "long", new FieldType(false, MinorType.BIGINT.getType(), null), null),
+ new Field(
+ "long_nullable", new FieldType(true, MinorType.BIGINT.getType(), null), null),
+ new Field(
+ "float", new FieldType(false, MinorType.FLOAT4.getType(), null), null),
+ new Field(
+ "float_nullable", new FieldType(true, MinorType.FLOAT4.getType(), null), null),
+ new Field(
+ "double", new FieldType(false, MinorType.FLOAT8.getType(), null), null),
+ new Field(
+ "double_nullable", new FieldType(true, MinorType.FLOAT8.getType(), null), null),
+ new Field(
+ "timestamp_tz", new FieldType(false, new ArrowType.Timestamp(
+ org.apache.arrow.vector.types.TimeUnit.MICROSECOND, "UTC"), null), null),
+ new Field(
+ "timestamp_tz_nullable", new FieldType(true, new ArrowType.Timestamp(
+ org.apache.arrow.vector.types.TimeUnit.MICROSECOND, "UTC"), null), null),
+ new Field(
+ "string", new FieldType(false, MinorType.VARCHAR.getType(), null), null),
+ new Field(
+ "string_nullable", new FieldType(true, MinorType.VARCHAR.getType(), null), null),
+ new Field(
+ "bytes", new FieldType(false, MinorType.VARBINARY.getType(), null), null),
+ new Field(
+ "bytes_nullable", new FieldType(true, MinorType.VARBINARY.getType(), null), null),
+ new Field(
+ "date", new FieldType(false, MinorType.DATEDAY.getType(), null), null),
+ new Field(
+ "date_nullable", new FieldType(true, MinorType.DATEDAY.getType(), null), null),
+ new Field(
+ "int_promotion", new FieldType(false, MinorType.INT.getType(), null), null)
+ );
+ List<Field> filteredFields = allFields.stream()
+ .filter(f -> columnSet.contains(f.getName()))
+ .collect(Collectors.toList());
+ return new org.apache.arrow.vector.types.pojo.Schema(filteredFields);
+ }
+
+ private List<GenericRecord> createIncrementalRecordsForDate(Schema schema, LocalDateTime datetime) {
+ List<GenericRecord> records = new ArrayList<>();
+ for (int i = 0; i < NUM_ROWS_PER_MONTH; i++) {
+ GenericRecord rec = GenericRecord.create(schema);
+ rec.setField("timestamp", datetime.plus(i, ChronoUnit.DAYS));
+ rec.setField("timestamp_nullable", datetime.plus(i, ChronoUnit.DAYS));
+ rec.setField("boolean", i % 2 == 0);
+ rec.setField("boolean_nullable", i % 2 == 0);
+ rec.setField("int", i);
+ rec.setField("int_nullable", i);
+ rec.setField("long", (long) i * 2);
+ rec.setField("long_nullable", (long) i * 2);
+ rec.setField("float", (float) i * 3);
+ rec.setField("float_nullable", (float) i * 3);
+ rec.setField("double", (double) i * 4);
+ rec.setField("double_nullable", (double) i * 4);
+ rec.setField("timestamp_tz", datetime.plus(i, ChronoUnit.MINUTES).atOffset(ZoneOffset.UTC));
+ rec.setField("timestamp_tz_nullable", datetime.plus(i, ChronoUnit.MINUTES).atOffset(ZoneOffset.UTC));
+ rec.setField("string", "String-" + i);
+ rec.setField("string_nullable", "String-" + i);
+ rec.setField("bytes", ByteBuffer.wrap(("Bytes-" + i).getBytes(StandardCharsets.UTF_8)));
+ rec.setField("bytes_nullable", ByteBuffer.wrap(("Bytes-" + i).getBytes(StandardCharsets.UTF_8)));
+ rec.setField("date", LocalDate.of(2020, 1, 1).plus(i, ChronoUnit.DAYS));
+ rec.setField("date_nullable", LocalDate.of(2020, 1, 1).plus(i, ChronoUnit.DAYS));
+ rec.setField("int_promotion", i);
+ records.add(rec);
+ }
+ return records;
+ }
+
+ private List<GenericRecord> createConstantRecordsForDate(Schema schema, LocalDateTime datetime) {
+ List<GenericRecord> records = new ArrayList<>();
+ for (int i = 0; i < NUM_ROWS_PER_MONTH; i++) {
+ GenericRecord rec = GenericRecord.create(schema);
+ rec.setField("timestamp", datetime);
+ rec.setField("timestamp_nullable", datetime);
+ rec.setField("boolean", true);
+ rec.setField("boolean_nullable", true);
+ rec.setField("int", 1);
+ rec.setField("int_nullable", 1);
+ rec.setField("long", 2L);
+ rec.setField("long_nullable", 2L);
+ rec.setField("float", 3.0f);
+ rec.setField("float_nullable", 3.0f);
+ rec.setField("double", 4.0);
+ rec.setField("double_nullable", 4.0);
+ rec.setField("timestamp_tz", datetime.atOffset(ZoneOffset.UTC));
+ rec.setField("timestamp_tz_nullable", datetime.atOffset(ZoneOffset.UTC));
+ rec.setField("string", "String");
+ rec.setField("string_nullable", "String");
+ rec.setField("bytes", ByteBuffer.wrap("Bytes".getBytes(StandardCharsets.UTF_8)));
+ rec.setField("bytes_nullable", ByteBuffer.wrap("Bytes".getBytes(StandardCharsets.UTF_8)));
+ rec.setField("date", LocalDate.of(2020, 1, 1));
+ rec.setField("date_nullable", LocalDate.of(2020, 1, 1));
+ rec.setField("int_promotion", 1);
+ records.add(rec);
+ }
+ return records;
+ }
+
+ private DataFile writeParquetFile(Table table, List<GenericRecord> records) throws IOException {
+ rowsWritten.addAll(records);
+ File parquetFile = temp.newFile();
+ assertTrue(parquetFile.delete());
+ FileAppender<GenericRecord> appender = Parquet.write(Files.localOutput(parquetFile))
+ .schema(table.schema())
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .build();
+ try {
+ appender.addAll(records);
+ } finally {
+ appender.close();
+ }
+
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+ partitionKey.partition(new LocalDateTimeToLongMicros(records.get(0)));
+
+ return DataFiles.builder(table.spec())
+ .withPartition(partitionKey)
+ .withInputFile(localInput(parquetFile))
+ .withMetrics(appender.metrics())
+ .withFormat(FileFormat.PARQUET)
+ .build();
+ }
+
+ private static long timestampToMicros(LocalDateTime value) {
+ Instant instant = value.toInstant(ZoneOffset.UTC);
+ return ChronoUnit.MICROS.between(Instant.EPOCH, instant);
+ }
+
+ private static long timestampToMicros(OffsetDateTime value) {
+ Instant instant = value.toInstant();
+ return ChronoUnit.MICROS.between(Instant.EPOCH, instant);
+ }
+
+ private static LocalDateTime timestampFromMicros(long micros) {
+ return LocalDateTime.ofEpochSecond(
+ TimeUnit.MICROSECONDS.toSeconds(micros),
+ (int) TimeUnit.MICROSECONDS.toNanos(micros % 1000),
+ ZoneOffset.UTC
+ );
+ }
+
+ private static LocalDate dateFromDay(int day) {
+ return LocalDate.ofEpochDay(day);
+ }
+
+ private void checkAllVectorTypes(VectorSchemaRoot root, Set<String> columnSet) {
+ assertEqualsForField(root, columnSet, "timestamp", TimeStampMicroVector.class);
+ assertEqualsForField(root, columnSet, "timestamp_nullable", TimeStampMicroVector.class);
+ assertEqualsForField(root, columnSet, "boolean", BitVector.class);
+ assertEqualsForField(root, columnSet, "boolean_nullable", BitVector.class);
+ assertEqualsForField(root, columnSet, "int", IntVector.class);
+ assertEqualsForField(root, columnSet, "int_nullable", IntVector.class);
+ assertEqualsForField(root, columnSet, "long", BigIntVector.class);
+ assertEqualsForField(root, columnSet, "long_nullable", BigIntVector.class);
+ assertEqualsForField(root, columnSet, "float", Float4Vector.class);
+ assertEqualsForField(root, columnSet, "float_nullable", Float4Vector.class);
+ assertEqualsForField(root, columnSet, "double", Float8Vector.class);
+ assertEqualsForField(root, columnSet, "double_nullable", Float8Vector.class);
+ assertEqualsForField(root, columnSet, "timestamp_tz", TimeStampMicroTZVector.class);
+ assertEqualsForField(root, columnSet, "timestamp_tz_nullable", TimeStampMicroTZVector.class);
+ assertEqualsForField(root, columnSet, "string", VarCharVector.class);
+ assertEqualsForField(root, columnSet, "string_nullable", VarCharVector.class);
+ assertEqualsForField(root, columnSet, "bytes", VarBinaryVector.class);
+ assertEqualsForField(root, columnSet, "bytes_nullable", VarBinaryVector.class);
+ assertEqualsForField(root, columnSet, "date", DateDayVector.class);
+ assertEqualsForField(root, columnSet, "date_nullable", DateDayVector.class);
+ assertEqualsForField(root, columnSet, "int_promotion", IntVector.class);
+ }
+
+ private void assertEqualsForField(
+ VectorSchemaRoot root, Set<String> columnSet, String columnName, Class<?> expected) {
+ if (columnSet.contains(columnName)) {
+ assertEquals(expected, root.getVector(columnName).getClass());
+ }
+ }
+
+ private void checkAllVectorValues(
+ int expectedNumRows,
+ List<GenericRecord> expectedRows,
+ VectorSchemaRoot root,
+ Set<String> columnSet) {
+ assertEquals(expectedNumRows, root.getRowCount());
+
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "timestamp",
+ (records, i) -> records.get(i).getField("timestamp"),
+ (vector, i) -> timestampFromMicros(((TimeStampMicroVector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "timestamp_nullable",
+ (records, i) -> records.get(i).getField("timestamp_nullable"),
+ (vector, i) -> timestampFromMicros(((TimeStampMicroVector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "boolean",
+ (records, i) -> records.get(i).getField("boolean"),
+ (vector, i) -> ((BitVector) vector).get(i) == 1
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "boolean_nullable",
+ (records, i) -> records.get(i).getField("boolean_nullable"),
+ (vector, i) -> ((BitVector) vector).get(i) == 1
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "int",
+ (records, i) -> records.get(i).getField("int"),
+ (vector, i) -> ((IntVector) vector).get(i)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "int_nullable",
+ (records, i) -> records.get(i).getField("int_nullable"),
+ (vector, i) -> ((IntVector) vector).get(i)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "long",
+ (records, i) -> records.get(i).getField("long"),
+ (vector, i) -> ((BigIntVector) vector).get(i)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "long_nullable",
+ (records, i) -> records.get(i).getField("long_nullable"),
+ (vector, i) -> ((BigIntVector) vector).get(i)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "float",
+ (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float")),
+ (vector, i) -> Float.floatToIntBits(((Float4Vector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "float_nullable",
+ (records, i) -> Float.floatToIntBits((float) records.get(i).getField("float_nullable")),
+ (vector, i) -> Float.floatToIntBits(((Float4Vector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "double",
+ (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double")),
+ (vector, i) -> Double.doubleToLongBits(((Float8Vector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "double_nullable",
+ (records, i) -> Double.doubleToLongBits((double) records.get(i).getField("double_nullable")),
+ (vector, i) -> Double.doubleToLongBits(((Float8Vector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "timestamp_tz",
+ (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz")),
+ (vector, i) -> ((TimeStampMicroTZVector) vector).get(i)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "timestamp_tz_nullable",
+ (records, i) -> timestampToMicros((OffsetDateTime) records.get(i).getField("timestamp_tz_nullable")),
+ (vector, i) -> ((TimeStampMicroTZVector) vector).get(i)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "string",
+ (records, i) -> records.get(i).getField("string"),
+ (vector, i) -> new String(((VarCharVector) vector).get(i), StandardCharsets.UTF_8)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "string_nullable",
+ (records, i) -> records.get(i).getField("string_nullable"),
+ (vector, i) -> new String(((VarCharVector) vector).get(i), StandardCharsets.UTF_8)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "bytes",
+ (records, i) -> records.get(i).getField("bytes"),
+ (vector, i) -> ByteBuffer.wrap(((VarBinaryVector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "bytes_nullable",
+ (records, i) -> records.get(i).getField("bytes_nullable"),
+ (vector, i) -> ByteBuffer.wrap(((VarBinaryVector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "date",
+ (records, i) -> records.get(i).getField("date"),
+ (vector, i) -> dateFromDay(((DateDayVector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "date_nullable",
+ (records, i) -> records.get(i).getField("date_nullable"),
+ (vector, i) -> dateFromDay(((DateDayVector) vector).get(i))
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "int_promotion",
+ (records, i) -> records.get(i).getField("int_promotion"),
+ (vector, i) -> ((IntVector) vector).get(i)
+ );
+ }
+
+ private static void checkVectorValues(
+ int expectedNumRows,
+ List<GenericRecord> expectedRows,
+ VectorSchemaRoot root,
+ Set<String> columnSet,
+ String columnName,
+ BiFunction<List<GenericRecord>, Integer, Object> expectedValueExtractor,
+ BiFunction<FieldVector, Integer, Object> vectorValueExtractor) {
+ if (columnSet.contains(columnName)) {
+ FieldVector vector = root.getVector(columnName);
+ assertEquals(expectedNumRows, vector.getValueCount());
+ for (int i = 0; i < expectedNumRows; i++) {
+ Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
+ Object actualValue = vectorValueExtractor.apply(vector, i);
+ assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+ }
+ }
+ }
+
+ private static final class LocalDateTimeToLongMicros implements StructLike {
+
+ private final Record row;
+
+ LocalDateTimeToLongMicros(Record row) {
+ this.row = row;
+ }
+
+ @Override
+ public int size() {
+ return row.size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ Object value = row.get(pos);
+ if (value instanceof LocalDateTime) {
+ @SuppressWarnings("unchecked")
+ T result = (T) (Long) timestampToMicros((LocalDateTime) value);
+ return result;
+ } else if (value instanceof OffsetDateTime) {
+ @SuppressWarnings("unchecked")
+ T result = (T) (Long) timestampToMicros(((OffsetDateTime) value).toLocalDateTime());
+ return result;
+ } else if (value != null) {
+ throw new IllegalArgumentException("Unsupported value type: " + value.getClass());
+ } else {
+ throw new IllegalArgumentException("Don't know how to handle null value");
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ row.set(pos, value);
+ }
+ }
+}
diff --git a/build.gradle b/build.gradle
index 5dc6f1a..4f74ee5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -737,9 +737,17 @@ project(':iceberg-arrow') {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
compile("org.apache.arrow:arrow-memory-netty") {
- exclude group: 'io.netty', module: 'netty-common'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
+ exclude group: 'io.netty', module: 'netty-common'
}
+
+ testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
+ // To run ArrowReaderTest test cases, :netty-common is needed.
+ // We import :netty-common through :arrow-memory-netty
+ // so that the same version as used by the :arrow-memory-netty module is picked.
+ testCompile("org.apache.arrow:arrow-memory-netty")
+ testCompile("org.apache.hadoop:hadoop-common")
+ testCompile("org.apache.hadoop:hadoop-mapreduce-client-core")
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
new file mode 100644
index 0000000..c63ce1e
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.math.BigDecimal;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+
+final class ArrowVectorAccessorFactory
+ extends GenericArrowVectorAccessorFactory<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> {
+
+ ArrowVectorAccessorFactory() {
+ super(DecimalFactoryImpl::new,
+ StringFactoryImpl::new,
+ StructChildFactoryImpl::new,
+ ArrayFactoryImpl::new);
+ }
+
+ private static final class DecimalFactoryImpl implements DecimalFactory<Decimal> {
+ @Override
+ public Class<Decimal> getGenericClass() {
+ return Decimal.class;
+ }
+
+ @Override
+ public Decimal ofLong(long value, int precision, int scale) {
+ return Decimal.apply(value, precision, scale);
+ }
+
+ @Override
+ public Decimal ofBigDecimal(BigDecimal value, int precision, int scale) {
+ return Decimal.apply(value, precision, scale);
+ }
+ }
+
+ private static final class StringFactoryImpl implements StringFactory<UTF8String> {
+ @Override
+ public Class<UTF8String> getGenericClass() {
+ return UTF8String.class;
+ }
+
+ @Override
+ public UTF8String ofRow(VarCharVector vector, int rowId) {
+ int start = vector.getStartOffset(rowId);
+ int end = vector.getEndOffset(rowId);
+
+ return UTF8String.fromAddress(
+ null,
+ vector.getDataBuffer().memoryAddress() + start,
+ end - start);
+ }
+
+ @Override
+ public UTF8String ofBytes(byte[] bytes) {
+ return UTF8String.fromBytes(bytes);
+ }
+ }
+
+ private static final class ArrayFactoryImpl implements ArrayFactory<ArrowColumnVector, ColumnarArray> {
+ @Override
+ public ArrowColumnVector ofChild(ValueVector childVector) {
+ return new ArrowColumnVector(childVector);
+ }
+
+ @Override
+ public ColumnarArray ofRow(ValueVector vector, ArrowColumnVector childData, int rowId) {
+ ArrowBuf offsets = vector.getOffsetBuffer();
+ int index = rowId * ListVector.OFFSET_WIDTH;
+ int start = offsets.getInt(index);
+ int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
+ return new ColumnarArray(childData, start, end - start);
+ }
+ }
+
+ private static final class StructChildFactoryImpl implements StructChildFactory<ArrowColumnVector> {
+ @Override
+ public Class<ArrowColumnVector> getGenericClass() {
+ return ArrowColumnVector.class;
+ }
+
+ @Override
+ public ArrowColumnVector of(ValueVector childVector) {
+ return new ArrowColumnVector(childVector);
+ }
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
index fb5f443..f3b3377 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
@@ -19,506 +19,22 @@
package org.apache.iceberg.spark.data.vectorized;
-import java.math.BigInteger;
-import java.util.stream.IntStream;
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.complex.StructVector;
-import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor;
import org.apache.iceberg.arrow.vectorized.VectorHolder;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.unsafe.types.UTF8String;
-import org.jetbrains.annotations.NotNull;
public class ArrowVectorAccessors {
- private ArrowVectorAccessors() {
- }
-
- static ArrowVectorAccessor getVectorAccessor(VectorHolder holder) {
- Dictionary dictionary = holder.dictionary();
- boolean isVectorDictEncoded = holder.isDictionaryEncoded();
- FieldVector vector = holder.vector();
- if (isVectorDictEncoded) {
- ColumnDescriptor desc = holder.descriptor();
- PrimitiveType primitive = desc.getPrimitiveType();
- return getDictionaryVectorAccessor(dictionary, desc, vector, primitive);
- } else {
- return getPlainVectorAccessor(vector);
- }
- }
-
- @NotNull
- private static ArrowVectorAccessor getDictionaryVectorAccessor(
- Dictionary dictionary,
- ColumnDescriptor desc,
- FieldVector vector, PrimitiveType primitive) {
- Preconditions.checkState(vector instanceof IntVector, "Dictionary ids should be stored in IntVectors only");
- if (primitive.getOriginalType() != null) {
- switch (desc.getPrimitiveType().getOriginalType()) {
- case ENUM:
- case JSON:
- case UTF8:
- case BSON:
- return new DictionaryStringAccessor((IntVector) vector, dictionary);
- case INT_64:
- case TIMESTAMP_MILLIS:
- case TIMESTAMP_MICROS:
- return new DictionaryLongAccessor((IntVector) vector, dictionary);
- case DECIMAL:
- switch (primitive.getPrimitiveTypeName()) {
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return new DictionaryDecimalBinaryAccessor(
- (IntVector) vector,
- dictionary);
- case INT64:
- return new DictionaryDecimalLongAccessor(
- (IntVector) vector,
- dictionary);
- case INT32:
- return new DictionaryDecimalIntAccessor(
- (IntVector) vector,
- dictionary);
- default:
- throw new UnsupportedOperationException(
- "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
- }
- default:
- throw new UnsupportedOperationException(
- "Unsupported logical type: " + primitive.getOriginalType());
- }
- } else {
- switch (primitive.getPrimitiveTypeName()) {
- case FIXED_LEN_BYTE_ARRAY:
- case BINARY:
- return new DictionaryBinaryAccessor((IntVector) vector, dictionary);
- case FLOAT:
- return new DictionaryFloatAccessor((IntVector) vector, dictionary);
- case INT64:
- return new DictionaryLongAccessor((IntVector) vector, dictionary);
- case DOUBLE:
- return new DictionaryDoubleAccessor((IntVector) vector, dictionary);
- default:
- throw new UnsupportedOperationException("Unsupported type: " + primitive);
- }
- }
- }
-
- @NotNull
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
- private static ArrowVectorAccessor getPlainVectorAccessor(FieldVector vector) {
- if (vector instanceof BitVector) {
- return new BooleanAccessor((BitVector) vector);
- } else if (vector instanceof IntVector) {
- return new IntAccessor((IntVector) vector);
- } else if (vector instanceof BigIntVector) {
- return new LongAccessor((BigIntVector) vector);
- } else if (vector instanceof Float4Vector) {
- return new FloatAccessor((Float4Vector) vector);
- } else if (vector instanceof Float8Vector) {
- return new DoubleAccessor((Float8Vector) vector);
- } else if (vector instanceof DecimalVector) {
- return new DecimalAccessor((DecimalVector) vector);
- } else if (vector instanceof VarCharVector) {
- return new StringAccessor((VarCharVector) vector);
- } else if (vector instanceof VarBinaryVector) {
- return new BinaryAccessor((VarBinaryVector) vector);
- } else if (vector instanceof DateDayVector) {
- return new DateAccessor((DateDayVector) vector);
- } else if (vector instanceof TimeStampMicroTZVector) {
- return new TimestampAccessor((TimeStampMicroTZVector) vector);
- } else if (vector instanceof ListVector) {
- ListVector listVector = (ListVector) vector;
- return new ArrayAccessor(listVector);
- } else if (vector instanceof StructVector) {
- StructVector structVector = (StructVector) vector;
- return new StructAccessor(structVector);
- }
- throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
- }
-
- private static class BooleanAccessor extends ArrowVectorAccessor {
-
- private final BitVector vector;
-
- BooleanAccessor(BitVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final boolean getBoolean(int rowId) {
- return vector.get(rowId) == 1;
- }
- }
-
- private static class IntAccessor extends ArrowVectorAccessor {
-
- private final IntVector vector;
-
- IntAccessor(IntVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final int getInt(int rowId) {
- return vector.get(rowId);
- }
-
- @Override
- final long getLong(int rowId) {
- return getInt(rowId);
- }
- }
-
- private static class LongAccessor extends ArrowVectorAccessor {
-
- private final BigIntVector vector;
-
- LongAccessor(BigIntVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final long getLong(int rowId) {
- return vector.get(rowId);
- }
- }
-
- private static class DictionaryLongAccessor extends ArrowVectorAccessor {
- private final IntVector offsetVector;
- private final long[] decodedDictionary;
-
- DictionaryLongAccessor(IntVector vector, Dictionary dictionary) {
- super(vector);
- this.offsetVector = vector;
- this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
- .mapToLong(dictionary::decodeToLong)
- .toArray();
- }
+ private static final ArrowVectorAccessorFactory factory = new ArrowVectorAccessorFactory();
- @Override
- final long getLong(int rowId) {
- return decodedDictionary[offsetVector.get(rowId)];
- }
+ static ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector>
+ getVectorAccessor(VectorHolder holder) {
+ return factory.getVectorAccessor(holder);
}
- private static class FloatAccessor extends ArrowVectorAccessor {
-
- private final Float4Vector vector;
-
- FloatAccessor(Float4Vector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final float getFloat(int rowId) {
- return vector.get(rowId);
- }
-
- @Override
- final double getDouble(int rowId) {
- return getFloat(rowId);
- }
- }
-
- private static class DictionaryFloatAccessor extends ArrowVectorAccessor {
- private final IntVector offsetVector;
- private final float[] decodedDictionary;
-
- DictionaryFloatAccessor(IntVector vector, Dictionary dictionary) {
- super(vector);
- this.offsetVector = vector;
- this.decodedDictionary = new float[dictionary.getMaxId() + 1];
- for (int i = 0; i <= dictionary.getMaxId(); i++) {
- decodedDictionary[i] = dictionary.decodeToFloat(i);
- }
- }
-
- @Override
- final float getFloat(int rowId) {
- return decodedDictionary[offsetVector.get(rowId)];
- }
-
- @Override
- final double getDouble(int rowId) {
- return getFloat(rowId);
- }
- }
-
- private static class DoubleAccessor extends ArrowVectorAccessor {
-
- private final Float8Vector vector;
-
- DoubleAccessor(Float8Vector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final double getDouble(int rowId) {
- return vector.get(rowId);
- }
- }
-
- private static class DictionaryDoubleAccessor extends ArrowVectorAccessor {
- private final IntVector offsetVector;
- private final double[] decodedDictionary;
-
- DictionaryDoubleAccessor(IntVector vector, Dictionary dictionary) {
- super(vector);
- this.offsetVector = vector;
- this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
- .mapToDouble(dictionary::decodeToDouble)
- .toArray();
- }
-
- @Override
- final double getDouble(int rowId) {
- return decodedDictionary[offsetVector.get(rowId)];
- }
- }
-
- private static class StringAccessor extends ArrowVectorAccessor {
-
- private final VarCharVector vector;
-
- StringAccessor(VarCharVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final UTF8String getUTF8String(int rowId) {
- int start = vector.getStartOffset(rowId);
- int end = vector.getEndOffset(rowId);
-
- return UTF8String.fromAddress(
- null,
- vector.getDataBuffer().memoryAddress() + start,
- end - start);
- }
- }
-
- private static class DictionaryStringAccessor extends ArrowVectorAccessor {
- private final UTF8String[] decodedDictionary;
- private final IntVector offsetVector;
-
- DictionaryStringAccessor(IntVector vector, Dictionary dictionary) {
- super(vector);
- this.offsetVector = vector;
- this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
- .mapToObj(dictionary::decodeToBinary)
- .map(binary -> UTF8String.fromBytes(binary.getBytes()))
- .toArray(UTF8String[]::new);
- }
-
- @Override
- final UTF8String getUTF8String(int rowId) {
- int offset = offsetVector.get(rowId);
- return decodedDictionary[offset];
- }
- }
-
- private static class BinaryAccessor extends ArrowVectorAccessor {
-
- private final VarBinaryVector vector;
-
- BinaryAccessor(VarBinaryVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final byte[] getBinary(int rowId) {
- return vector.get(rowId);
- }
- }
-
- private static class DictionaryBinaryAccessor extends ArrowVectorAccessor {
- private final IntVector offsetVector;
- private final byte[][] decodedDictionary;
-
- DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
- super(vector);
- this.offsetVector = vector;
- this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
- .mapToObj(dictionary::decodeToBinary)
- .map(Binary::getBytes)
- .toArray(byte[][]::new);
- }
-
- @Override
- final byte[] getBinary(int rowId) {
- int offset = offsetVector.get(rowId);
- return decodedDictionary[offset];
- }
- }
-
- private static class DateAccessor extends ArrowVectorAccessor {
-
- private final DateDayVector vector;
-
- DateAccessor(DateDayVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final int getInt(int rowId) {
- return vector.get(rowId);
- }
- }
-
- private static class TimestampAccessor extends ArrowVectorAccessor {
-
- private final TimeStampMicroTZVector vector;
-
- TimestampAccessor(TimeStampMicroTZVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final long getLong(int rowId) {
- return vector.get(rowId);
- }
- }
-
- private static class ArrayAccessor extends ArrowVectorAccessor {
-
- private final ListVector vector;
- private final ArrowColumnVector arrayData;
-
- ArrayAccessor(ListVector vector) {
- super(vector);
- this.vector = vector;
- this.arrayData = new ArrowColumnVector(vector.getDataVector());
- }
-
- @Override
- final ColumnarArray getArray(int rowId) {
- ArrowBuf offsets = vector.getOffsetBuffer();
- int index = rowId * ListVector.OFFSET_WIDTH;
- int start = offsets.getInt(index);
- int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
- return new ColumnarArray(arrayData, start, end - start);
- }
- }
-
- /**
- * Use {@link IcebergArrowColumnVector#getChild(int)} to get hold of the {@link ArrowColumnVector} vectors holding the
- * struct values.
- */
- private static class StructAccessor extends ArrowVectorAccessor {
- StructAccessor(StructVector structVector) {
- super(structVector, IntStream.range(0, structVector.size())
- .mapToObj(structVector::getVectorById)
- .map(ArrowColumnVector::new)
- .toArray(ArrowColumnVector[]::new));
- }
- }
-
- private static class DecimalAccessor extends ArrowVectorAccessor {
-
- private final DecimalVector vector;
-
- DecimalAccessor(DecimalVector vector) {
- super(vector);
- this.vector = vector;
- }
-
- @Override
- final Decimal getDecimal(int rowId, int precision, int scale) {
- return Decimal.apply(DecimalUtility.getBigDecimalFromArrowBuf(vector.getDataBuffer(), rowId, scale),
- precision, scale);
- }
- }
-
- @SuppressWarnings("checkstyle:VisibilityModifier")
- private abstract static class DictionaryDecimalAccessor extends ArrowVectorAccessor {
- final Decimal[] cache;
- Dictionary parquetDictionary;
- final IntVector offsetVector;
-
- private DictionaryDecimalAccessor(IntVector vector, Dictionary dictionary) {
- super(vector);
- this.offsetVector = vector;
- this.parquetDictionary = dictionary;
- this.cache = new Decimal[dictionary.getMaxId() + 1];
- }
- }
-
- private static class DictionaryDecimalBinaryAccessor extends DictionaryDecimalAccessor {
-
- DictionaryDecimalBinaryAccessor(IntVector vector, Dictionary dictionary) {
- super(vector, dictionary);
- }
-
- @Override
- final Decimal getDecimal(int rowId, int precision, int scale) {
- int dictId = offsetVector.get(rowId);
- if (cache[dictId] == null) {
- cache[dictId] = Decimal.apply(
- new BigInteger(parquetDictionary.decodeToBinary(dictId).getBytes()).longValue(),
- precision,
- scale);
- }
- return cache[dictId];
- }
- }
-
- private static class DictionaryDecimalLongAccessor extends DictionaryDecimalAccessor {
-
- DictionaryDecimalLongAccessor(IntVector vector, Dictionary dictionary) {
- super(vector, dictionary);
- }
-
- @Override
- final Decimal getDecimal(int rowId, int precision, int scale) {
- int dictId = offsetVector.get(rowId);
- if (cache[dictId] == null) {
- cache[dictId] = Decimal.apply(parquetDictionary.decodeToLong(dictId), precision, scale);
- }
- return cache[dictId];
- }
- }
-
- private static class DictionaryDecimalIntAccessor extends DictionaryDecimalAccessor {
-
- DictionaryDecimalIntAccessor(IntVector vector, Dictionary dictionary) {
- super(vector, dictionary);
- }
-
- @Override
- final Decimal getDecimal(int rowId, int precision, int scale) {
- int dictId = offsetVector.get(rowId);
- if (cache[dictId] == null) {
- cache[dictId] = Decimal.apply(parquetDictionary.decodeToInt(dictId), precision, scale);
- }
- return cache[dictId];
- }
+ private ArrowVectorAccessors() {
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
index 9b973f4..514eec8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.data.vectorized;
+import org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
@@ -38,7 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String;
*/
public class IcebergArrowColumnVector extends ColumnVector {
- private final ArrowVectorAccessor accessor;
+ private final ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> accessor;
private final NullabilityHolder nullabilityHolder;
public IcebergArrowColumnVector(VectorHolder holder) {
@@ -150,7 +151,7 @@ public class IcebergArrowColumnVector extends ColumnVector {
new IcebergArrowColumnVector(holder);
}
- public ArrowVectorAccessor vectorAccessor() {
+ public ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> vectorAccessor() {
return accessor;
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 818f405..b2d5823 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -19,26 +19,12 @@
package org.apache.iceberg.spark.data.vectorized;
-import java.util.List;
import java.util.Map;
-import java.util.stream.IntStream;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.arrow.ArrowAllocation;
-import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
-import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.ConstantVectorReader;
+import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.parquet.VectorizedReader;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
public class VectorizedSparkParquetReaders {
@@ -59,93 +45,8 @@ public class VectorizedSparkParquetReaders {
Map<Integer, ?> idToConstant) {
return (ColumnarBatchReader)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector, idToConstant));
- }
-
- private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
- private final MessageType parquetSchema;
- private final Schema icebergSchema;
- private final BufferAllocator rootAllocator;
- private final Map<Integer, ?> idToConstant;
- private final boolean setArrowValidityVector;
-
- VectorizedReaderBuilder(
- Schema expectedSchema,
- MessageType parquetSchema,
- boolean setArrowValidityVector, Map<Integer, ?> idToConstant) {
- this.parquetSchema = parquetSchema;
- this.icebergSchema = expectedSchema;
- this.rootAllocator = ArrowAllocation.rootAllocator()
- .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
- this.setArrowValidityVector = setArrowValidityVector;
- this.idToConstant = idToConstant;
- }
-
- @Override
- public VectorizedReader<?> message(
- Types.StructType expected, MessageType message,
- List<VectorizedReader<?>> fieldReaders) {
- GroupType groupType = message.asGroupType();
- Map<Integer, VectorizedReader<?>> readersById = Maps.newHashMap();
- List<Type> fields = groupType.getFields();
-
- IntStream.range(0, fields.size())
- .filter(pos -> fields.get(pos).getId() != null)
- .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos)));
-
- List<Types.NestedField> icebergFields = expected != null ?
- expected.fields() : ImmutableList.of();
-
- List<VectorizedReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
- icebergFields.size());
-
- for (Types.NestedField field : icebergFields) {
- int id = field.fieldId();
- VectorizedReader<?> reader = readersById.get(id);
- if (idToConstant.containsKey(id)) {
- reorderedFields.add(new ConstantVectorReader<>(idToConstant.get(id)));
- } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
- reorderedFields.add(VectorizedArrowReader.positions());
- } else if (reader != null) {
- reorderedFields.add(reader);
- } else {
- reorderedFields.add(VectorizedArrowReader.nulls());
- }
- }
- return new ColumnarBatchReader(reorderedFields);
- }
-
- @Override
- public VectorizedReader<?> struct(
- Types.StructType expected, GroupType groupType,
- List<VectorizedReader<?>> fieldReaders) {
- if (expected != null) {
- throw new UnsupportedOperationException("Vectorized reads are not supported yet for struct fields");
- }
- return null;
- }
-
- @Override
- public VectorizedReader<?> primitive(
- org.apache.iceberg.types.Type.PrimitiveType expected,
- PrimitiveType primitive) {
-
- // Create arrow vector for this field
- if (primitive.getId() == null) {
- return null;
- }
- int parquetFieldId = primitive.getId().intValue();
- ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
- // Nested types not yet supported for vectorized reads
- if (desc.getMaxRepetitionLevel() > 0) {
- return null;
- }
- Types.NestedField icebergField = icebergSchema.findField(parquetFieldId);
- if (icebergField == null) {
- return null;
- }
- // Set the validity buffer if null checking is enabled in arrow
- return new VectorizedArrowReader(desc, icebergField, rootAllocator, setArrowValidityVector);
- }
+ new VectorizedReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector,
+ idToConstant, ColumnarBatchReader::new));
}
}