You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/19 19:14:00 UTC
[incubator-iceberg] branch vectorized-read updated: Enable a path
for consuming applications to not reuse underlying arrow buffers. (#707)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/vectorized-read by this push:
new 1961674 Enable a path for consuming applications to not reuse underlying arrow buffers. (#707)
1961674 is described below
commit 1961674582b9a6252a43cd2150564f798d478fef
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Thu Dec 19 11:13:48 2019 -0800
Enable a path for consuming applications to not reuse underlying arrow buffers. (#707)
Clean up interfaces and remove unused code.
---
.../iceberg/arrow/vectorized/VectorHolder.java | 11 ++++-
.../arrow/vectorized/VectorizedArrowReader.java | 18 +++++---
.../parquet/VectorizedParquetValuesReader.java | 10 ++---
.../java/org/apache/iceberg/parquet/Parquet.java | 6 +--
.../vectorized/VectorizedParquetReader.java | 21 ++++------
.../parquet/vectorized/VectorizedReader.java | 8 ++--
.../data/vectorized/ColumnarBatchReaders.java | 22 +++++-----
.../data/vectorized/IcebergArrowColumnVector.java | 4 +-
.../vectorized/VectorizedSparkParquetReaders.java | 9 ++--
.../org/apache/iceberg/spark/source/Reader.java | 2 -
.../iceberg/spark/source/VectorizedReading.java | 48 +---------------------
11 files changed, 61 insertions(+), 98 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
index 5cf0ae8..33e7427 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
@@ -35,18 +35,21 @@ public class VectorHolder {
@Nullable
private final Dictionary dictionary;
+ private final NullabilityHolder nullabilityHolder;
- public static final VectorHolder NULL_VECTOR_HOLDER = new VectorHolder(null, null, false, null);
+ public static final VectorHolder NULL_VECTOR_HOLDER = new VectorHolder(null, null, false, null, null);
public VectorHolder(
ColumnDescriptor columnDescriptor,
FieldVector vector,
boolean isDictionaryEncoded,
- Dictionary dictionary) {
+ Dictionary dictionary,
+ NullabilityHolder holder) {
this.columnDescriptor = columnDescriptor;
this.vector = vector;
this.isDictionaryEncoded = isDictionaryEncoded;
this.dictionary = dictionary;
+ this.nullabilityHolder = holder;
}
public ColumnDescriptor getDescriptor() {
@@ -64,4 +67,8 @@ public class VectorHolder {
public Dictionary getDictionary() {
return dictionary;
}
+
+ public NullabilityHolder getNullabilityHolder() {
+ return nullabilityHolder;
+ }
}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 3b0f022..ab8e5b8 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -51,7 +51,7 @@ import org.apache.parquet.schema.PrimitiveType;
* It also takes care of allocating the right kind of Arrow vectors depending on the corresponding
* Iceberg/Parquet data types.
*/
-public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder, VectorHolder> {
+public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
public static final int DEFAULT_BATCH_SIZE = 5000;
public static final int UNKNOWN_WIDTH = -1;
@@ -71,6 +71,8 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
private final BufferAllocator rootAlloc;
private FieldVector vec;
private int typeWidth;
+ private boolean reuseContainers = true;
+ private NullabilityHolder nullabilityHolder;
// In cases when Parquet employs fall back encoding, we eagerly decode the dictionary encoded data
// before storing the values in the Arrow vector. This means even if the dictionary is present, data
@@ -122,11 +124,12 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
@SuppressWarnings("checkstyle:CyclomaticComplexity")
@Override
- public VectorHolder read(NullabilityHolder nullabilityHolder) {
- if (vec == null) {
+ public VectorHolder read() {
+ if (vec == null || !reuseContainers) {
typeWidth = allocateFieldVector();
}
vec.setValueCount(0);
+ nullabilityHolder = new NullabilityHolder(batchSize);
if (vectorizedColumnIterator.hasNext()) {
if (allPagesDictEncoded) {
vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
@@ -163,7 +166,7 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
}
}
}
- return new VectorHolder(columnDescriptor, vec, allPagesDictEncoded, dictionary);
+ return new VectorHolder(columnDescriptor, vec, allPagesDictEncoded, dictionary, nullabilityHolder);
}
private int allocateFieldVector() {
@@ -280,6 +283,11 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
}
@Override
+ public void reuseContainers(boolean reuse) {
+ this.reuseContainers = reuse;
+ }
+
+ @Override
public String toString() {
return columnDescriptor.toString();
}
@@ -287,7 +295,7 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
public static final VectorizedArrowReader NULL_VALUES_READER =
new VectorizedArrowReader() {
@Override
- public VectorHolder read(NullabilityHolder holder) {
+ public VectorHolder read() {
return VectorHolder.NULL_VECTOR_HOLDER;
}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java
index 0e2256e..b4b6dc2 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java
@@ -1008,7 +1008,6 @@ public final class VectorizedParquetValuesReader extends ValuesReader {
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
- // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
@@ -1018,7 +1017,6 @@ public final class VectorizedParquetValuesReader extends ValuesReader {
break;
case PACKED:
for (int i = 0; i < num; i++) {
- // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
@@ -1198,13 +1196,11 @@ public final class VectorizedParquetValuesReader extends ValuesReader {
byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
- bufferIdx++;
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
} else {
- //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
- nullabilityHolder.setNull(bufferIdx);
- bufferIdx++;
+ setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
+ bufferIdx++;
}
break;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 52a1cde..9ea7713 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -286,7 +286,7 @@ public class Parquet {
private Schema schema = null;
private Expression filter = null;
private ReadSupport<?> readSupport = null;
- private Function<MessageType, VectorizedReader> batchedReaderFunc = null;
+ private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
private boolean isBatchedReadEnabled = false;
private boolean filterRecords = true;
@@ -347,7 +347,7 @@ public class Parquet {
return this;
}
- public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader> func) {
+ public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
this.batchedReaderFunc = func;
return this;
}
@@ -404,7 +404,7 @@ public class Parquet {
ParquetReadOptions options = optionsBuilder.build();
if (isBatchedReadEnabled) {
- return new VectorizedParquetReader<>(file, schema, options, batchedReaderFunc, filter, reuseContainers,
+ return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, filter, reuseContainers,
caseSensitive, maxRecordsPerBatch);
} else {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
index 00528d5..2d89cd5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
@@ -52,7 +52,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
private final InputFile input;
private final Schema expectedSchema;
private final ParquetReadOptions options;
- private final Function<MessageType, VectorizedReader> batchReaderFunc;
+ private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
private final Expression filter;
private final boolean reuseContainers;
private final boolean caseSensitive;
@@ -60,7 +60,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
public VectorizedParquetReader(
InputFile input, Schema expectedSchema, ParquetReadOptions options,
- Function<MessageType, VectorizedReader> readerFunc,
+ Function<MessageType, VectorizedReader<T>> readerFunc,
Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
this.input = input;
this.expectedSchema = expectedSchema;
@@ -78,7 +78,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
private final InputFile file;
private final ParquetReadOptions options;
private final MessageType projection;
- private final VectorizedReader model;
+ private final VectorizedReader<T> model;
private final List<BlockMetaData> rowGroups;
private final boolean[] shouldSkip;
private final long totalValues;
@@ -87,7 +87,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
@SuppressWarnings("unchecked")
ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
- Function<MessageType, VectorizedReader> readerFunc, boolean reuseContainers,
+ Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
boolean caseSensitive, int bSize) {
this.file = file;
this.options = options;
@@ -190,7 +190,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
private ReadConf init() {
if (conf == null) {
- ReadConf<T> readConf = new ReadConf(
+ ReadConf readConf = new ReadConf(
input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
this.conf = readConf.copy();
return readConf;
@@ -209,9 +209,8 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
private static class FileIterator<T> implements Iterator<T>, Closeable {
private final ParquetFileReader reader;
private final boolean[] shouldSkip;
- private final VectorizedReader model;
+ private final VectorizedReader<T> model;
private final long totalValues;
- private final boolean reuseContainers;
private final int batchSize;
private int nextRowGroup = 0;
@@ -224,7 +223,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
this.shouldSkip = conf.shouldSkip();
this.model = conf.model();
this.totalValues = conf.totalValues();
- this.reuseContainers = conf.reuseContainers();
+ this.model.reuseContainers(conf.reuseContainers());
this.batchSize = conf.batchSize();
}
@@ -241,11 +240,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
if (valuesRead >= nextRowGroupStart) {
advance();
}
- if (reuseContainers) {
- this.last = (T) model.read(null);
- } else {
- this.last = (T) model.read(null);
- }
+ this.last = model.read();
valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
return last;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
index 5fb78be..d181162 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
@@ -25,12 +25,14 @@ import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ColumnPath;
/**
- * Marker interface for vectorized Iceberg readers.
+ * Interface for vectorized Iceberg readers.
*/
-public interface VectorizedReader<E, T> {
- T read(E input);
+public interface VectorizedReader<T> {
+ T read();
void setRowGroupInfo(PageReadStore pages, DictionaryPageReadStore dictionaryPageReadStore,
Map<ColumnPath, Boolean> columnPathBooleanMap);
+
+ void reuseContainers(boolean reuse);
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java
index 18c2b12..d266fc1 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java
@@ -23,15 +23,12 @@ import java.lang.reflect.Array;
import java.util.List;
import java.util.Map;
import org.apache.arrow.vector.FieldVector;
-import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
import org.apache.iceberg.parquet.vectorized.VectorizedReader;
-import org.apache.iceberg.types.Types;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.schema.Type;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -40,13 +37,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
* {@link ColumnarBatch} returned is created by passing in the Arrow vectors populated via delegated read calls to
* {@linkplain VectorizedArrowReader VectorReader(s)}.
*/
-public class ColumnarBatchReaders implements VectorizedReader<ColumnarBatch, ColumnarBatch> {
+public class ColumnarBatchReaders implements VectorizedReader<ColumnarBatch> {
private final VectorizedArrowReader[] readers;
private final int batchSize;
+ private boolean reuseContainers;
public ColumnarBatchReaders(
- List<Type> types,
- Types.StructType icebergExpectedFields,
List<VectorizedReader> readers,
int bSize) {
this.readers = (VectorizedArrowReader[]) Array.newInstance(
@@ -72,17 +68,23 @@ public class ColumnarBatchReaders implements VectorizedReader<ColumnarBatch, Col
}
@Override
- public final ColumnarBatch read(ColumnarBatch ignore) {
+ public void reuseContainers(boolean reuse) {
+ for (VectorizedReader reader : readers) {
+ reader.reuseContainers(reuse);
+ }
+ }
+
+ @Override
+ public final ColumnarBatch read() {
ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];
int numRows = 0;
for (int i = 0; i < readers.length; i += 1) {
- NullabilityHolder nullabilityHolder = new NullabilityHolder(batchSize);
- VectorHolder holder = readers[i].read(nullabilityHolder);
+ VectorHolder holder = readers[i].read();
FieldVector vector = holder.getVector();
if (vector == null) {
arrowColumnVectors[i] = new NullValuesColumnVector(batchSize);
} else {
- arrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
+ arrowColumnVectors[i] = new IcebergArrowColumnVector(holder);
numRows = vector.getValueCount();
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
index 076b5f3..ae6e44c 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
@@ -70,9 +70,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
private final boolean isVectorDictEncoded;
private ArrowColumnVector[] childColumns;
- public IcebergArrowColumnVector(VectorHolder holder, NullabilityHolder nulls) {
+ public IcebergArrowColumnVector(VectorHolder holder) {
super(ArrowUtils.instance().fromArrowField(holder.getVector().getField()));
- this.nullabilityHolder = nulls;
+ this.nullabilityHolder = holder.getNullabilityHolder();
this.columnDescriptor = holder.getDescriptor();
this.dictionary = holder.getDictionary();
this.isVectorDictEncoded = holder.isDictionaryEncoded();
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 0bc96ae..8650ea6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -65,17 +65,17 @@ public class VectorizedSparkParquetReaders {
LOG.info("=> [VectorizedSparkParquetReaders] recordsPerBatch = {}", recordsPerBatch);
return (ColumnarBatchReaders)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new VectorReaderBuilder(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
+ new VectorizedReaderBuilder(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
}
- private static class VectorReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader> {
+ private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader> {
private final MessageType parquetSchema;
private final Schema projectedIcebergSchema;
private final Schema tableIcebergSchema;
private final BufferAllocator rootAllocator;
private final int recordsPerBatch;
- VectorReaderBuilder(
+ VectorizedReaderBuilder(
Schema tableSchema,
Schema projectedIcebergSchema,
MessageType parquetSchema,
@@ -134,8 +134,7 @@ public class VectorizedSparkParquetReaders {
types.add(null);
}
}
-
- return new ColumnarBatchReaders(types, expected, reorderedFields, recordsPerBatch);
+ return new ColumnarBatchReaders(reorderedFields, recordsPerBatch);
}
@Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 24a23dd..49bfd2a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -145,8 +145,6 @@ class Reader implements DataSourceReader,
this.numRecordsPerBatch = VectorizedArrowReader.DEFAULT_BATCH_SIZE;
}
- LOG.info("=> Set Config numRecordsPerBatch = {}", numRecordsPerBatch);
-
if (snapshotId != null && asOfTimestamp != null) {
throw new IllegalArgumentException(
"Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java b/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java
index 3e4d1b6..939acac 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java
@@ -30,9 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -44,11 +42,9 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.parquet.vectorized.VectorizedReader;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.Types;
-import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -194,8 +190,6 @@ public class VectorizedReading {
}
private Iterator<ColumnarBatch> open(FileScanTask task) {
- DataFile file = task.file();
-
// schema or rows returned by readers
Schema finalSchema = expectedSchema;
PartitionSpec spec = task.spec();
@@ -205,47 +199,17 @@ public class VectorizedReading {
StructType sparkType = SparkSchemaUtil.convert(finalSchema);
Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
-
- Schema iterSchema;
Iterator<ColumnarBatch> iter;
-
- // if (hasJoinedPartitionColumns) {
- // // schema used to read data files
- // Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
- // Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
- // PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
- // JoinedRow joined = new JoinedRow();
- //
- // InternalRow partition = convertToRow.apply(file.partition());
- // joined.withRight(partition);
- //
- // // create joined rows and project from the joined schema to the final schema
- // iterSchema = TypeUtil.join(readSchema, partitionSchema);
- // iter = Iterators.transform(open(task, readSchema), joined::withLeft);
- //
- // } else if (hasExtraFilterColumns) {
if (hasExtraFilterColumns) {
- // add projection to the final schema
- iterSchema = requiredSchema;
iter = open(task, requiredSchema);
} else {
- // return the base iterator
- iterSchema = finalSchema;
iter = open(task, finalSchema);
}
-
- // TODO: remove the projection by reporting the iterator's schema back to Spark
- // return Iterators.transform(iter,
- // APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
return iter;
}
private Iterator<ColumnarBatch> open(FileScanTask task, Schema readSchema) {
CloseableIterable<ColumnarBatch> iter;
- // if (task.isDataTask()) {
- // iter = newDataIterable(task.asDataTask(), readSchema);
- //
- // } else {
InputFile location = inputFiles.get(task.file().path().toString());
Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
@@ -256,11 +220,8 @@ public class VectorizedReading {
default:
throw new UnsupportedOperationException(
"Cannot read unknown format: " + task.file().format());
- // }
}
-
this.currentCloseable = iter;
-
return iter.iterator();
}
@@ -294,13 +255,8 @@ public class VectorizedReading {
.project(readSchema)
.split(task.start(), task.length())
.enableBatchedRead()
- .createBatchedReaderFunc(new Function<MessageType, VectorizedReader>() {
- @Override
- public VectorizedReader apply(MessageType fileSchema) {
- return VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema,
- fileSchema, numRecordsPerBatch);
- }
- })
+ .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema,
+ fileSchema, numRecordsPerBatch))
.filter(task.residual())
.caseSensitive(caseSensitive)
.recordsPerBatch(numRecordsPerBatch)