You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/07/27 01:42:46 UTC
[iceberg] branch master updated: Spark 3.2: Support different task types in readers (#5363)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7a92bf5e31 Spark 3.2: Support different task types in readers (#5363)
7a92bf5e31 is described below
commit 7a92bf5e31a995c61dc598d2a36f598d5c120756
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Tue Jul 26 18:42:41 2022 -0700
Spark 3.2: Support different task types in readers (#5363)
---
.../iceberg/spark/source/BaseBatchReader.java | 110 +++++++++++++
.../{BaseDataReader.java => BaseReader.java} | 128 +++++++++++----
.../apache/iceberg/spark/source/BaseRowReader.java | 104 +++++++++++++
.../iceberg/spark/source/BatchDataReader.java | 135 +++-------------
.../spark/source/EqualityDeleteRowReader.java | 11 +-
.../apache/iceberg/spark/source/RowDataReader.java | 173 +++------------------
...parkBaseDataReader.java => TestBaseReader.java} | 15 +-
7 files changed, 367 insertions(+), 309 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
new file mode 100644
index 0000000000..1b0b69299b
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
+ private final int batchSize;
+
+ BaseBatchReader(Table table, ScanTaskGroup<T> taskGroup, Schema expectedSchema, boolean caseSensitive,
+ int batchSize) {
+ super(table, taskGroup, expectedSchema, caseSensitive);
+ this.batchSize = batchSize;
+ }
+
+ protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile inputFile, FileFormat format,
+ long start, long length, Expression residual,
+ Map<Integer, ?> idToConstant,
+ SparkDeleteFilter deleteFilter) {
+ switch (format) {
+ case PARQUET:
+ return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);
+
+ case ORC:
+ return newOrcIterable(inputFile, start, length, residual, idToConstant);
+
+ default:
+ throw new UnsupportedOperationException("Format: " + format + " not supported for batched reads");
+ }
+ }
+
+ private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile inputFile, long start, long length,
+ Expression residual, Map<Integer, ?> idToConstant,
+ SparkDeleteFilter deleteFilter) {
+ // get required schema for filtering out equality-delete rows in case equality-delete uses columns are
+ // not selected.
+ Schema requiredSchema = deleteFilter != null && deleteFilter.hasEqDeletes() ?
+ deleteFilter.requiredSchema() : expectedSchema();
+
+ return Parquet.read(inputFile)
+ .project(requiredSchema)
+ .split(start, length)
+ .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(requiredSchema,
+ fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
+ deleteFilter))
+ .recordsPerBatch(batchSize)
+ .filter(residual)
+ .caseSensitive(caseSensitive())
+ // Spark eagerly consumes the batches. So the underlying memory allocated could be reused
+ // without worrying about subsequent reads clobbering over each other. This improves
+ // read performance as every batch read doesn't have to pay the cost of allocating memory.
+ .reuseContainers()
+ .withNameMapping(nameMapping())
+ .build();
+ }
+
+ private CloseableIterable<ColumnarBatch> newOrcIterable(InputFile inputFile, long start, long length,
+ Expression residual, Map<Integer, ?> idToConstant) {
+ Set<Integer> constantFieldIds = idToConstant.keySet();
+ Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
+ Sets.SetView<Integer> constantAndMetadataFieldIds = Sets.union(constantFieldIds, metadataFieldIds);
+ Schema schemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
+
+ return ORC.read(inputFile)
+ .project(schemaWithoutConstantAndMetadataFields)
+ .split(start, length)
+ .createBatchedReaderFunc(fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema,
+ idToConstant))
+ .recordsPerBatch(batchSize)
+ .filter(residual)
+ .caseSensitive(caseSensitive())
+ .withNameMapping(nameMapping())
+ .build();
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
similarity index 58%
rename from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
rename to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index f0664c7e8e..0210ceb177 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -26,29 +26,38 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
@@ -60,35 +69,46 @@ import org.slf4j.LoggerFactory;
*
* @param <T> is the Java class returned by this reader whose objects contain one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class);
+abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
- private final Map<String, InputFile> inputFiles;
+ private final Schema expectedSchema;
+ private final boolean caseSensitive;
+ private final NameMapping nameMapping;
+ private final ScanTaskGroup<TaskT> taskGroup;
+ private final Iterator<TaskT> tasks;
+ private Map<String, InputFile> lazyInputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private TaskT currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseReader(Table table, ScanTaskGroup<TaskT> taskGroup, Schema expectedSchema, boolean caseSensitive) {
this.table = table;
- this.tasks = task.files().iterator();
- Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
- task.files().stream()
- .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
- .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
- Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
- .map(entry -> EncryptedFiles.encryptedInput(table.io().newInputFile(entry.getKey()), entry.getValue()));
+ this.taskGroup = taskGroup;
+ this.tasks = taskGroup.tasks().iterator();
+ this.currentIterator = CloseableIterator.empty();
+ this.expectedSchema = expectedSchema;
+ this.caseSensitive = caseSensitive;
+ String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+ this.nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+ }
- // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
- Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encrypted::iterator);
+ protected abstract CloseableIterator<T> open(TaskT task);
- Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
- decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
- this.inputFiles = ImmutableMap.copyOf(files);
+ protected abstract Stream<ContentFile<?>> referencedFiles(TaskT task);
- this.currentIterator = CloseableIterator.empty();
+ protected Schema expectedSchema() {
+ return expectedSchema;
+ }
+
+ protected boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ protected NameMapping nameMapping() {
+ return nameMapping;
}
protected Table table() {
@@ -112,7 +132,10 @@ abstract class BaseDataReader<T> implements Closeable {
}
} catch (IOException | RuntimeException e) {
if (currentTask != null && !currentTask.isDataTask()) {
- LOG.error("Error reading file: {}", getInputFile(currentTask).location(), e);
+ String filePaths = referencedFiles(currentTask)
+ .map(file -> file.path().toString())
+ .collect(Collectors.joining(", "));
+ LOG.error("Error reading file(s): {}", filePaths, e);
}
throw e;
}
@@ -122,8 +145,6 @@ abstract class BaseDataReader<T> implements Closeable {
return current;
}
- abstract CloseableIterator<T> open(FileScanTask task);
-
@Override
public void close() throws IOException {
InputFileBlockHolder.unset();
@@ -137,21 +158,38 @@ abstract class BaseDataReader<T> implements Closeable {
}
}
- protected InputFile getInputFile(FileScanTask task) {
- Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
- return inputFiles.get(task.file().path().toString());
+ protected InputFile getInputFile(String location) {
+ return inputFiles().get(location);
}
- protected InputFile getInputFile(String location) {
- return inputFiles.get(location);
+ private Map<String, InputFile> inputFiles() {
+ if (lazyInputFiles == null) {
+ Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
+ .flatMap(this::referencedFiles)
+ .map(this::toEncryptedInputFile);
+
+ // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
+ Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encryptedFiles::iterator);
+
+ Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(taskGroup.tasks().size());
+ decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
+ this.lazyInputFiles = ImmutableMap.copyOf(files);
+ }
+
+ return lazyInputFiles;
+ }
+
+ private EncryptedInputFile toEncryptedInputFile(ContentFile<?> file) {
+ InputFile inputFile = table.io().newInputFile(file.path().toString());
+ return EncryptedFiles.encryptedInput(inputFile, file.keyMetadata());
}
- protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) {
+ protected Map<Integer, ?> constantsMap(ContentScanTask<?> task, Schema readSchema) {
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
StructType partitionType = Partitioning.partitionType(table);
- return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
+ return PartitionUtil.constantsMap(task, partitionType, BaseReader::convertConstant);
} else {
- return PartitionUtil.constantsMap(task, BaseDataReader::convertConstant);
+ return PartitionUtil.constantsMap(task, BaseReader::convertConstant);
}
}
@@ -200,4 +238,28 @@ abstract class BaseDataReader<T> implements Closeable {
}
return value;
}
+
+ protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+ private final InternalRowWrapper asStructLike;
+
+ SparkDeleteFilter(String filePath, List<DeleteFile> deletes) {
+ super(filePath, deletes, table.schema(), expectedSchema);
+ this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+ }
+
+ @Override
+ protected StructLike asStructLike(InternalRow row) {
+ return asStructLike.wrap(row);
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return BaseReader.this.getInputFile(location);
+ }
+
+ @Override
+ protected void markRowDeleted(InternalRow row) {
+ row.setBoolean(columnIsDeletedPosition(), true);
+ }
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
new file mode 100644
index 0000000000..3aeb65d7ce
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+abstract class BaseRowReader<T extends ScanTask> extends BaseReader<InternalRow, T> {
+ BaseRowReader(Table table, ScanTaskGroup<T> taskGroup, Schema expectedSchema, boolean caseSensitive) {
+ super(table, taskGroup, expectedSchema, caseSensitive);
+ }
+
+ protected CloseableIterable<InternalRow> newIterable(InputFile file, FileFormat format, long start, long length,
+ Expression residual, Schema projection,
+ Map<Integer, ?> idToConstant) {
+ switch (format) {
+ case PARQUET:
+ return newParquetIterable(file, start, length, residual, projection, idToConstant);
+
+ case AVRO:
+ return newAvroIterable(file, start, length, projection, idToConstant);
+
+ case ORC:
+ return newOrcIterable(file, start, length, residual, projection, idToConstant);
+
+ default:
+ throw new UnsupportedOperationException("Cannot read unknown format: " + format);
+ }
+ }
+
+ private CloseableIterable<InternalRow> newAvroIterable(InputFile file, long start, long length, Schema projection,
+ Map<Integer, ?> idToConstant) {
+ return Avro.read(file)
+ .reuseContainers()
+ .project(projection)
+ .split(start, length)
+ .createReaderFunc(readSchema -> new SparkAvroReader(projection, readSchema, idToConstant))
+ .withNameMapping(nameMapping())
+ .build();
+ }
+
+ private CloseableIterable<InternalRow> newParquetIterable(InputFile file, long start, long length,
+ Expression residual, Schema readSchema,
+ Map<Integer, ?> idToConstant) {
+ return Parquet.read(file)
+ .reuseContainers()
+ .split(start, length)
+ .project(readSchema)
+ .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
+ .filter(residual)
+ .caseSensitive(caseSensitive())
+ .withNameMapping(nameMapping())
+ .build();
+ }
+
+ private CloseableIterable<InternalRow> newOrcIterable(InputFile file, long start, long length, Expression residual,
+ Schema readSchema, Map<Integer, ?> idToConstant) {
+ Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema,
+ Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
+
+ return ORC.read(file)
+ .project(readSchemaWithoutConstantAndMetadataFields)
+ .split(start, length)
+ .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant))
+ .filter(residual)
+ .caseSensitive(caseSensitive())
+ .withNameMapping(nameMapping())
+ .build();
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 5f643fa37d..f0b8dda234 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -20,139 +20,44 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import java.util.Set;
-import org.apache.arrow.vector.NullCheckingForGet;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
-class BatchDataReader extends BaseDataReader<ColumnarBatch> {
- private final Schema expectedSchema;
- private final String nameMapping;
- private final boolean caseSensitive;
- private final int batchSize;
-
- BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
- super(table, task);
- this.expectedSchema = expectedSchema;
- this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
- this.caseSensitive = caseSensitive;
- this.batchSize = size;
+class BatchDataReader extends BaseBatchReader<FileScanTask> {
+ BatchDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema expectedSchema, boolean caseSensitive,
+ int size) {
+ super(table, task, expectedSchema, caseSensitive, size);
}
@Override
- CloseableIterator<ColumnarBatch> open(FileScanTask task) {
- DataFile file = task.file();
-
- // update the current file for Spark's filename() function
- InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
-
- Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
-
- CloseableIterable<ColumnarBatch> iter;
- InputFile location = getInputFile(task);
- Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
- if (task.file().format() == FileFormat.PARQUET) {
- SparkDeleteFilter deleteFilter = deleteFilter(task);
- // get required schema for filtering out equality-delete rows in case equality-delete uses columns are
- // not selected.
- Schema requiredSchema = requiredSchema(deleteFilter);
-
- Parquet.ReadBuilder builder = Parquet.read(location)
- .project(requiredSchema)
- .split(task.start(), task.length())
- .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(requiredSchema,
- fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
- deleteFilter))
- .recordsPerBatch(batchSize)
- .filter(task.residual())
- .caseSensitive(caseSensitive)
- // Spark eagerly consumes the batches. So the underlying memory allocated could be reused
- // without worrying about subsequent reads clobbering over each other. This improves
- // read performance as every batch read doesn't have to pay the cost of allocating memory.
- .reuseContainers();
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- iter = builder.build();
- } else if (task.file().format() == FileFormat.ORC) {
- Set<Integer> constantFieldIds = idToConstant.keySet();
- Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
- Sets.SetView<Integer> constantAndMetadataFieldIds = Sets.union(constantFieldIds, metadataFieldIds);
- Schema schemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(expectedSchema, constantAndMetadataFieldIds);
- ORC.ReadBuilder builder = ORC.read(location)
- .project(schemaWithoutConstantAndMetadataFields)
- .split(task.start(), task.length())
- .createBatchedReaderFunc(fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema, fileSchema,
- idToConstant))
- .recordsPerBatch(batchSize)
- .filter(task.residual())
- .caseSensitive(caseSensitive);
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- iter = builder.build();
- } else {
- throw new UnsupportedOperationException(
- "Format: " + task.file().format() + " not supported for batched reads");
- }
- return iter.iterator();
+ protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+ return Stream.concat(Stream.of(task.file()), task.deletes().stream());
}
- private SparkDeleteFilter deleteFilter(FileScanTask task) {
- return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task, table().schema(), expectedSchema);
- }
+ @Override
+ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
+ String filePath = task.file().path().toString();
- private Schema requiredSchema(DeleteFilter deleteFilter) {
- if (deleteFilter != null && deleteFilter.hasEqDeletes()) {
- return deleteFilter.requiredSchema();
- } else {
- return expectedSchema;
- }
- }
+ // update the current file for Spark's filename() function
+ InputFileBlockHolder.set(filePath, task.start(), task.length());
- private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
- private final InternalRowWrapper asStructLike;
+ Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
- SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
- super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
- this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
- }
+ InputFile inputFile = getInputFile(filePath);
+ Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");
- @Override
- protected StructLike asStructLike(InternalRow row) {
- return asStructLike.wrap(row);
- }
+ SparkDeleteFilter deleteFilter = task.deletes().isEmpty() ? null : new SparkDeleteFilter(filePath, task.deletes());
- @Override
- protected InputFile getInputFile(String location) {
- return BatchDataReader.this.getInputFile(location);
- }
+ return newBatchIterable(inputFile, task.file().format(), task.start(), task.length(), task.residual(),
+ idToConstant, deleteFilter).iterator();
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index ce2226f4f7..9441e8c4a2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -30,20 +30,17 @@ import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
public class EqualityDeleteRowReader extends RowDataReader {
- private final Schema expectedSchema;
-
public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
- super(task, table, table.schema(), caseSensitive);
- this.expectedSchema = expectedSchema;
+ super(task, table, expectedSchema, caseSensitive);
}
@Override
- CloseableIterator<InternalRow> open(FileScanTask task) {
- SparkDeleteFilter matches = new SparkDeleteFilter(task, tableSchema(), expectedSchema);
+ protected CloseableIterator<InternalRow> open(FileScanTask task) {
+ SparkDeleteFilter matches = new SparkDeleteFilter(task.file().path().toString(), task.deletes());
// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
- Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
+ Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
DataFile file = task.file();
// update the current file for Spark's filename() function
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 6ebceee2f6..8590346197 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -20,185 +20,58 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
-class RowDataReader extends BaseDataReader<InternalRow> {
-
- private final Schema tableSchema;
- private final Schema expectedSchema;
- private final String nameMapping;
- private final boolean caseSensitive;
+class RowDataReader extends BaseRowReader<FileScanTask> {
+ RowDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema expectedSchema, boolean caseSensitive) {
+ super(table, task, expectedSchema, caseSensitive);
+ }
- RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
- super(table, task);
- this.tableSchema = table.schema();
- this.expectedSchema = expectedSchema;
- this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
- this.caseSensitive = caseSensitive;
+ @Override
+ protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+ return Stream.concat(Stream.of(task.file()), task.deletes().stream());
}
@Override
- CloseableIterator<InternalRow> open(FileScanTask task) {
- SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema);
+ protected CloseableIterator<InternalRow> open(FileScanTask task) {
+ String filePath = task.file().path().toString();
+ SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes());
// schema or rows returned by readers
- Schema requiredSchema = deletes.requiredSchema();
- Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
- DataFile file = task.file();
+ Schema requiredSchema = deleteFilter.requiredSchema();
+ Map<Integer, ?> idToConstant = constantsMap(task, requiredSchema);
// update the current file for Spark's filename() function
- InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
-
- return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
- }
+ InputFileBlockHolder.set(filePath, task.start(), task.length());
- protected Schema tableSchema() {
- return tableSchema;
+ return deleteFilter.filter(open(task, requiredSchema, idToConstant)).iterator();
}
protected CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
- CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
- iter = newDataIterable(task.asDataTask(), readSchema);
+ return newDataIterable(task.asDataTask(), readSchema);
} else {
- InputFile location = getInputFile(task);
- Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
-
- switch (task.file().format()) {
- case PARQUET:
- iter = newParquetIterable(location, task, readSchema, idToConstant);
- break;
-
- case AVRO:
- iter = newAvroIterable(location, task, readSchema, idToConstant);
- break;
-
- case ORC:
- iter = newOrcIterable(location, task, readSchema, idToConstant);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Cannot read unknown format: " + task.file().format());
- }
- }
-
- return iter;
- }
-
- private CloseableIterable<InternalRow> newAvroIterable(
- InputFile location,
- FileScanTask task,
- Schema projection,
- Map<Integer, ?> idToConstant) {
- Avro.ReadBuilder builder = Avro.read(location)
- .reuseContainers()
- .project(projection)
- .split(task.start(), task.length())
- .createReaderFunc(readSchema -> new SparkAvroReader(projection, readSchema, idToConstant));
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- return builder.build();
- }
-
- private CloseableIterable<InternalRow> newParquetIterable(
- InputFile location,
- FileScanTask task,
- Schema readSchema,
- Map<Integer, ?> idToConstant) {
- Parquet.ReadBuilder builder = Parquet.read(location)
- .reuseContainers()
- .split(task.start(), task.length())
- .project(readSchema)
- .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
- .filter(task.residual())
- .caseSensitive(caseSensitive);
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ InputFile inputFile = getInputFile(task.file().path().toString());
+ Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");
+ return newIterable(inputFile, task.file().format(), task.start(), task.length(), task.residual(), readSchema,
+ idToConstant);
}
-
- return builder.build();
- }
-
- private CloseableIterable<InternalRow> newOrcIterable(
- InputFile location,
- FileScanTask task,
- Schema readSchema,
- Map<Integer, ?> idToConstant) {
- Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema,
- Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
-
- ORC.ReadBuilder builder = ORC.read(location)
- .project(readSchemaWithoutConstantAndMetadataFields)
- .split(task.start(), task.length())
- .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant))
- .filter(task.residual())
- .caseSensitive(caseSensitive);
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- return builder.build();
}
private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
StructInternalRow row = new StructInternalRow(readSchema.asStruct());
- CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
- task.asDataTask().rows(), row::setStruct);
- return asSparkRows;
- }
-
- protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
- private final InternalRowWrapper asStructLike;
-
- SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
- super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
- this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
- }
-
- @Override
- protected StructLike asStructLike(InternalRow row) {
- return asStructLike.wrap(row);
- }
-
- @Override
- protected InputFile getInputFile(String location) {
- return RowDataReader.this.getInputFile(location);
- }
-
- @Override
- protected void markRowDeleted(InternalRow row) {
- row.setBoolean(columnIsDeletedPosition(), true);
- }
+ return CloseableIterable.transform(task.asDataTask().rows(), row::setStruct);
}
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
similarity index 95%
rename from spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
rename to spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
index 870be890da..7f15cb28fa 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
@@ -27,10 +27,12 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
@@ -51,7 +53,7 @@ import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.apache.iceberg.Files.localOutput;
-public class TestSparkBaseDataReader {
+public class TestBaseReader {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@@ -86,15 +88,20 @@ public class TestSparkBaseDataReader {
// Main reader class to test base class iteration logic.
// Keeps track of iterator closure.
- private static class ClosureTrackingReader extends BaseDataReader<Integer> {
+ private static class ClosureTrackingReader extends BaseReader<Integer, FileScanTask> {
private Map<String, CloseableIntegerRange> tracker = Maps.newHashMap();
ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
- super(table, new BaseCombinedScanTask(tasks));
+ super(table, new BaseCombinedScanTask(tasks), null, false);
}
@Override
- CloseableIterator<Integer> open(FileScanTask task) {
+ protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+ return Stream.of();
+ }
+
+ @Override
+ protected CloseableIterator<Integer> open(FileScanTask task) {
CloseableIntegerRange intRange = new CloseableIntegerRange(task.file().recordCount());
tracker.put(getKey(task), intRange);
return intRange;