You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/07/27 01:42:46 UTC

[iceberg] branch master updated: Spark 3.2: Support different task types in readers (#5363)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a92bf5e31 Spark 3.2: Support different task types in readers (#5363)
7a92bf5e31 is described below

commit 7a92bf5e31a995c61dc598d2a36f598d5c120756
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Tue Jul 26 18:42:41 2022 -0700

    Spark 3.2: Support different task types in readers (#5363)
---
 .../iceberg/spark/source/BaseBatchReader.java      | 110 +++++++++++++
 .../{BaseDataReader.java => BaseReader.java}       | 128 +++++++++++----
 .../apache/iceberg/spark/source/BaseRowReader.java | 104 +++++++++++++
 .../iceberg/spark/source/BatchDataReader.java      | 135 +++-------------
 .../spark/source/EqualityDeleteRowReader.java      |  11 +-
 .../apache/iceberg/spark/source/RowDataReader.java | 173 +++------------------
 ...parkBaseDataReader.java => TestBaseReader.java} |  15 +-
 7 files changed, 367 insertions(+), 309 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
new file mode 100644
index 0000000000..1b0b69299b
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
+  private final int batchSize;
+
+  BaseBatchReader(Table table, ScanTaskGroup<T> taskGroup, Schema expectedSchema, boolean caseSensitive,
+                  int batchSize) {
+    super(table, taskGroup, expectedSchema, caseSensitive);
+    this.batchSize = batchSize;
+  }
+
+  protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile inputFile, FileFormat format,
+                                                              long start, long length, Expression residual,
+                                                              Map<Integer, ?> idToConstant,
+                                                              SparkDeleteFilter deleteFilter) {
+    switch (format) {
+      case PARQUET:
+        return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);
+
+      case ORC:
+        return newOrcIterable(inputFile, start, length, residual, idToConstant);
+
+      default:
+        throw new UnsupportedOperationException("Format: " + format + " not supported for batched reads");
+    }
+  }
+
+  private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile inputFile, long start, long length,
+                                                              Expression residual, Map<Integer, ?> idToConstant,
+                                                              SparkDeleteFilter deleteFilter) {
+    // get required schema for filtering out equality-delete rows in case equality-delete uses columns are
+    // not selected.
+    Schema requiredSchema = deleteFilter != null && deleteFilter.hasEqDeletes() ?
+        deleteFilter.requiredSchema() : expectedSchema();
+
+    return Parquet.read(inputFile)
+        .project(requiredSchema)
+        .split(start, length)
+        .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(requiredSchema,
+            fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
+            deleteFilter))
+        .recordsPerBatch(batchSize)
+        .filter(residual)
+        .caseSensitive(caseSensitive())
+        // Spark eagerly consumes the batches. So the underlying memory allocated could be reused
+        // without worrying about subsequent reads clobbering over each other. This improves
+        // read performance as every batch read doesn't have to pay the cost of allocating memory.
+        .reuseContainers()
+        .withNameMapping(nameMapping())
+        .build();
+  }
+
+  private CloseableIterable<ColumnarBatch> newOrcIterable(InputFile inputFile, long start, long length,
+                                                          Expression residual, Map<Integer, ?> idToConstant) {
+    Set<Integer> constantFieldIds = idToConstant.keySet();
+    Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
+    Sets.SetView<Integer> constantAndMetadataFieldIds = Sets.union(constantFieldIds, metadataFieldIds);
+    Schema schemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
+
+    return ORC.read(inputFile)
+        .project(schemaWithoutConstantAndMetadataFields)
+        .split(start, length)
+        .createBatchedReaderFunc(fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema,
+            idToConstant))
+        .recordsPerBatch(batchSize)
+        .filter(residual)
+        .caseSensitive(caseSensitive())
+        .withNameMapping(nameMapping())
+        .build();
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
similarity index 58%
rename from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
rename to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index f0664c7e8e..0210ceb177 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -26,29 +26,38 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedInputFile;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -60,35 +69,46 @@ import org.slf4j.LoggerFactory;
  *
  * @param <T> is the Java class returned by this reader whose objects contain one or more rows.
  */
-abstract class BaseDataReader<T> implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class);
+abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
 
   private final Table table;
-  private final Iterator<FileScanTask> tasks;
-  private final Map<String, InputFile> inputFiles;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final NameMapping nameMapping;
+  private final ScanTaskGroup<TaskT> taskGroup;
+  private final Iterator<TaskT> tasks;
 
+  private Map<String, InputFile> lazyInputFiles;
   private CloseableIterator<T> currentIterator;
   private T current = null;
-  private FileScanTask currentTask = null;
+  private TaskT currentTask = null;
 
-  BaseDataReader(Table table, CombinedScanTask task) {
+  BaseReader(Table table, ScanTaskGroup<TaskT> taskGroup, Schema expectedSchema, boolean caseSensitive) {
     this.table = table;
-    this.tasks = task.files().iterator();
-    Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
-    task.files().stream()
-        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
-        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
-    Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
-        .map(entry -> EncryptedFiles.encryptedInput(table.io().newInputFile(entry.getKey()), entry.getValue()));
+    this.taskGroup = taskGroup;
+    this.tasks = taskGroup.tasks().iterator();
+    this.currentIterator = CloseableIterator.empty();
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = caseSensitive;
+    String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    this.nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+  }
 
-    // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
-    Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encrypted::iterator);
+  protected abstract CloseableIterator<T> open(TaskT task);
 
-    Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
-    decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
-    this.inputFiles = ImmutableMap.copyOf(files);
+  protected abstract Stream<ContentFile<?>> referencedFiles(TaskT task);
 
-    this.currentIterator = CloseableIterator.empty();
+  protected Schema expectedSchema() {
+    return expectedSchema;
+  }
+
+  protected boolean caseSensitive() {
+    return caseSensitive;
+  }
+
+  protected NameMapping nameMapping() {
+    return nameMapping;
   }
 
   protected Table table() {
@@ -112,7 +132,10 @@ abstract class BaseDataReader<T> implements Closeable {
       }
     } catch (IOException | RuntimeException e) {
       if (currentTask != null && !currentTask.isDataTask()) {
-        LOG.error("Error reading file: {}", getInputFile(currentTask).location(), e);
+        String filePaths = referencedFiles(currentTask)
+            .map(file -> file.path().toString())
+            .collect(Collectors.joining(", "));
+        LOG.error("Error reading file(s): {}", filePaths, e);
       }
       throw e;
     }
@@ -122,8 +145,6 @@ abstract class BaseDataReader<T> implements Closeable {
     return current;
   }
 
-  abstract CloseableIterator<T> open(FileScanTask task);
-
   @Override
   public void close() throws IOException {
     InputFileBlockHolder.unset();
@@ -137,21 +158,38 @@ abstract class BaseDataReader<T> implements Closeable {
     }
   }
 
-  protected InputFile getInputFile(FileScanTask task) {
-    Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
-    return inputFiles.get(task.file().path().toString());
+  protected InputFile getInputFile(String location) {
+    return inputFiles().get(location);
   }
 
-  protected InputFile getInputFile(String location) {
-    return inputFiles.get(location);
+  private Map<String, InputFile> inputFiles() {
+    if (lazyInputFiles == null) {
+      Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
+          .flatMap(this::referencedFiles)
+          .map(this::toEncryptedInputFile);
+
+      // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
+      Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encryptedFiles::iterator);
+
+      Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(taskGroup.tasks().size());
+      decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
+      this.lazyInputFiles = ImmutableMap.copyOf(files);
+    }
+
+    return lazyInputFiles;
+  }
+
+  private EncryptedInputFile toEncryptedInputFile(ContentFile<?> file) {
+    InputFile inputFile = table.io().newInputFile(file.path().toString());
+    return EncryptedFiles.encryptedInput(inputFile, file.keyMetadata());
   }
 
-  protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) {
+  protected Map<Integer, ?> constantsMap(ContentScanTask<?> task, Schema readSchema) {
     if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
       StructType partitionType = Partitioning.partitionType(table);
-      return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
+      return PartitionUtil.constantsMap(task, partitionType, BaseReader::convertConstant);
     } else {
-      return PartitionUtil.constantsMap(task, BaseDataReader::convertConstant);
+      return PartitionUtil.constantsMap(task, BaseReader::convertConstant);
     }
   }
 
@@ -200,4 +238,28 @@ abstract class BaseDataReader<T> implements Closeable {
     }
     return value;
   }
+
+  protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+    private final InternalRowWrapper asStructLike;
+
+    SparkDeleteFilter(String filePath, List<DeleteFile> deletes) {
+      super(filePath, deletes, table.schema(), expectedSchema);
+      this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+    }
+
+    @Override
+    protected StructLike asStructLike(InternalRow row) {
+      return asStructLike.wrap(row);
+    }
+
+    @Override
+    protected InputFile getInputFile(String location) {
+      return BaseReader.this.getInputFile(location);
+    }
+
+    @Override
+    protected void markRowDeleted(InternalRow row) {
+      row.setBoolean(columnIsDeletedPosition(), true);
+    }
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
new file mode 100644
index 0000000000..3aeb65d7ce
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+abstract class BaseRowReader<T extends ScanTask> extends BaseReader<InternalRow, T> {
+  BaseRowReader(Table table, ScanTaskGroup<T> taskGroup, Schema expectedSchema, boolean caseSensitive) {
+    super(table, taskGroup, expectedSchema, caseSensitive);
+  }
+
+  protected CloseableIterable<InternalRow> newIterable(InputFile file, FileFormat format, long start, long length,
+                                                       Expression residual, Schema projection,
+                                                       Map<Integer, ?> idToConstant) {
+    switch (format) {
+      case PARQUET:
+        return newParquetIterable(file, start, length, residual, projection, idToConstant);
+
+      case AVRO:
+        return newAvroIterable(file, start, length, projection, idToConstant);
+
+      case ORC:
+        return newOrcIterable(file, start, length, residual, projection, idToConstant);
+
+      default:
+        throw new UnsupportedOperationException("Cannot read unknown format: " + format);
+    }
+  }
+
+  private CloseableIterable<InternalRow> newAvroIterable(InputFile file, long start, long length, Schema projection,
+                                                         Map<Integer, ?> idToConstant) {
+    return Avro.read(file)
+        .reuseContainers()
+        .project(projection)
+        .split(start, length)
+        .createReaderFunc(readSchema -> new SparkAvroReader(projection, readSchema, idToConstant))
+        .withNameMapping(nameMapping())
+        .build();
+  }
+
+  private CloseableIterable<InternalRow> newParquetIterable(InputFile file, long start, long length,
+                                                            Expression residual, Schema readSchema,
+                                                            Map<Integer, ?> idToConstant) {
+    return Parquet.read(file)
+        .reuseContainers()
+        .split(start, length)
+        .project(readSchema)
+        .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
+        .filter(residual)
+        .caseSensitive(caseSensitive())
+        .withNameMapping(nameMapping())
+        .build();
+  }
+
+  private CloseableIterable<InternalRow> newOrcIterable(InputFile file, long start, long length, Expression residual,
+                                                        Schema readSchema, Map<Integer, ?> idToConstant) {
+    Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema,
+        Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
+
+    return ORC.read(file)
+        .project(readSchemaWithoutConstantAndMetadataFields)
+        .split(start, length)
+        .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant))
+        .filter(residual)
+        .caseSensitive(caseSensitive())
+        .withNameMapping(nameMapping())
+        .build();
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 5f643fa37d..f0b8dda234 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -20,139 +20,44 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
-import org.apache.arrow.vector.NullCheckingForGet;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
-class BatchDataReader extends BaseDataReader<ColumnarBatch> {
-  private final Schema expectedSchema;
-  private final String nameMapping;
-  private final boolean caseSensitive;
-  private final int batchSize;
-
-  BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
-    super(table, task);
-    this.expectedSchema = expectedSchema;
-    this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
-    this.caseSensitive = caseSensitive;
-    this.batchSize = size;
+class BatchDataReader extends BaseBatchReader<FileScanTask> {
+  BatchDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema expectedSchema, boolean caseSensitive,
+                  int size) {
+    super(table, task, expectedSchema, caseSensitive, size);
   }
 
   @Override
-  CloseableIterator<ColumnarBatch> open(FileScanTask task) {
-    DataFile file = task.file();
-
-    // update the current file for Spark's filename() function
-    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
-
-    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
-
-    CloseableIterable<ColumnarBatch> iter;
-    InputFile location = getInputFile(task);
-    Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
-    if (task.file().format() == FileFormat.PARQUET) {
-      SparkDeleteFilter deleteFilter = deleteFilter(task);
-      // get required schema for filtering out equality-delete rows in case equality-delete uses columns are
-      // not selected.
-      Schema requiredSchema = requiredSchema(deleteFilter);
-
-      Parquet.ReadBuilder builder = Parquet.read(location)
-          .project(requiredSchema)
-          .split(task.start(), task.length())
-          .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(requiredSchema,
-              fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
-              deleteFilter))
-          .recordsPerBatch(batchSize)
-          .filter(task.residual())
-          .caseSensitive(caseSensitive)
-          // Spark eagerly consumes the batches. So the underlying memory allocated could be reused
-          // without worrying about subsequent reads clobbering over each other. This improves
-          // read performance as every batch read doesn't have to pay the cost of allocating memory.
-          .reuseContainers();
-
-      if (nameMapping != null) {
-        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-      }
-
-      iter = builder.build();
-    } else if (task.file().format() == FileFormat.ORC) {
-      Set<Integer> constantFieldIds = idToConstant.keySet();
-      Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
-      Sets.SetView<Integer> constantAndMetadataFieldIds = Sets.union(constantFieldIds, metadataFieldIds);
-      Schema schemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(expectedSchema, constantAndMetadataFieldIds);
-      ORC.ReadBuilder builder = ORC.read(location)
-          .project(schemaWithoutConstantAndMetadataFields)
-          .split(task.start(), task.length())
-          .createBatchedReaderFunc(fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema, fileSchema,
-              idToConstant))
-          .recordsPerBatch(batchSize)
-          .filter(task.residual())
-          .caseSensitive(caseSensitive);
-
-      if (nameMapping != null) {
-        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-      }
-
-      iter = builder.build();
-    } else {
-      throw new UnsupportedOperationException(
-          "Format: " + task.file().format() + " not supported for batched reads");
-    }
-    return iter.iterator();
+  protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+    return Stream.concat(Stream.of(task.file()), task.deletes().stream());
   }
 
-  private SparkDeleteFilter deleteFilter(FileScanTask task) {
-    return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task, table().schema(), expectedSchema);
-  }
+  @Override
+  protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
+    String filePath = task.file().path().toString();
 
-  private Schema requiredSchema(DeleteFilter deleteFilter) {
-    if (deleteFilter != null && deleteFilter.hasEqDeletes()) {
-      return deleteFilter.requiredSchema();
-    } else {
-      return expectedSchema;
-    }
-  }
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
 
-  private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
-    private final InternalRowWrapper asStructLike;
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
 
-    SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
-      super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
-      this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
-    }
+    InputFile inputFile = getInputFile(filePath);
+    Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");
 
-    @Override
-    protected StructLike asStructLike(InternalRow row) {
-      return asStructLike.wrap(row);
-    }
+    SparkDeleteFilter deleteFilter = task.deletes().isEmpty() ? null : new SparkDeleteFilter(filePath, task.deletes());
 
-    @Override
-    protected InputFile getInputFile(String location) {
-      return BatchDataReader.this.getInputFile(location);
-    }
+    return newBatchIterable(inputFile, task.file().format(), task.start(), task.length(), task.residual(),
+        idToConstant, deleteFilter).iterator();
   }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index ce2226f4f7..9441e8c4a2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -30,20 +30,17 @@ import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 public class EqualityDeleteRowReader extends RowDataReader {
-  private final Schema expectedSchema;
-
   public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
-    super(task, table, table.schema(), caseSensitive);
-    this.expectedSchema = expectedSchema;
+    super(task, table, expectedSchema, caseSensitive);
   }
 
   @Override
-  CloseableIterator<InternalRow> open(FileScanTask task) {
-    SparkDeleteFilter matches = new SparkDeleteFilter(task, tableSchema(), expectedSchema);
+  protected CloseableIterator<InternalRow> open(FileScanTask task) {
+    SparkDeleteFilter matches = new SparkDeleteFilter(task.file().path().toString(), task.deletes());
 
     // schema or rows returned by readers
     Schema requiredSchema = matches.requiredSchema();
-    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
     DataFile file = task.file();
 
     // update the current file for Spark's filename() function
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 6ebceee2f6..8590346197 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -20,185 +20,58 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
-class RowDataReader extends BaseDataReader<InternalRow> {
-
-  private final Schema tableSchema;
-  private final Schema expectedSchema;
-  private final String nameMapping;
-  private final boolean caseSensitive;
+class RowDataReader extends BaseRowReader<FileScanTask> {
+  RowDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema expectedSchema, boolean caseSensitive) {
+    super(table, task, expectedSchema, caseSensitive);
+  }
 
-  RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
-    super(table, task);
-    this.tableSchema = table.schema();
-    this.expectedSchema = expectedSchema;
-    this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
-    this.caseSensitive = caseSensitive;
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+    return Stream.concat(Stream.of(task.file()), task.deletes().stream());
   }
 
   @Override
-  CloseableIterator<InternalRow> open(FileScanTask task) {
-    SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema);
+  protected CloseableIterator<InternalRow> open(FileScanTask task) {
+    String filePath = task.file().path().toString();
+    SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes());
 
     // schema or rows returned by readers
-    Schema requiredSchema = deletes.requiredSchema();
-    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
-    DataFile file = task.file();
+    Schema requiredSchema = deleteFilter.requiredSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, requiredSchema);
 
     // update the current file for Spark's filename() function
-    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
-
-    return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
-  }
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
 
-  protected Schema tableSchema() {
-    return tableSchema;
+    return deleteFilter.filter(open(task, requiredSchema, idToConstant)).iterator();
   }
 
   protected CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
-    CloseableIterable<InternalRow> iter;
     if (task.isDataTask()) {
-      iter = newDataIterable(task.asDataTask(), readSchema);
+      return newDataIterable(task.asDataTask(), readSchema);
     } else {
-      InputFile location = getInputFile(task);
-      Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
-
-      switch (task.file().format()) {
-        case PARQUET:
-          iter = newParquetIterable(location, task, readSchema, idToConstant);
-          break;
-
-        case AVRO:
-          iter = newAvroIterable(location, task, readSchema, idToConstant);
-          break;
-
-        case ORC:
-          iter = newOrcIterable(location, task, readSchema, idToConstant);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Cannot read unknown format: " + task.file().format());
-      }
-    }
-
-    return iter;
-  }
-
-  private CloseableIterable<InternalRow> newAvroIterable(
-      InputFile location,
-      FileScanTask task,
-      Schema projection,
-      Map<Integer, ?> idToConstant) {
-    Avro.ReadBuilder builder = Avro.read(location)
-        .reuseContainers()
-        .project(projection)
-        .split(task.start(), task.length())
-        .createReaderFunc(readSchema -> new SparkAvroReader(projection, readSchema, idToConstant));
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<InternalRow> newParquetIterable(
-      InputFile location,
-      FileScanTask task,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    Parquet.ReadBuilder builder = Parquet.read(location)
-        .reuseContainers()
-        .split(task.start(), task.length())
-        .project(readSchema)
-        .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
-        .filter(task.residual())
-        .caseSensitive(caseSensitive);
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      InputFile inputFile = getInputFile(task.file().path().toString());
+      Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");
+      return newIterable(inputFile, task.file().format(), task.start(), task.length(), task.residual(), readSchema,
+          idToConstant);
     }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<InternalRow> newOrcIterable(
-      InputFile location,
-      FileScanTask task,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema,
-        Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
-
-    ORC.ReadBuilder builder = ORC.read(location)
-        .project(readSchemaWithoutConstantAndMetadataFields)
-        .split(task.start(), task.length())
-        .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant))
-        .filter(task.residual())
-        .caseSensitive(caseSensitive);
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
   }
 
   private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
     StructInternalRow row = new StructInternalRow(readSchema.asStruct());
-    CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
-        task.asDataTask().rows(), row::setStruct);
-    return asSparkRows;
-  }
-
-  protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
-    private final InternalRowWrapper asStructLike;
-
-    SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
-      super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
-      this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
-    }
-
-    @Override
-    protected StructLike asStructLike(InternalRow row) {
-      return asStructLike.wrap(row);
-    }
-
-    @Override
-    protected InputFile getInputFile(String location) {
-      return RowDataReader.this.getInputFile(location);
-    }
-
-    @Override
-    protected void markRowDeleted(InternalRow row) {
-      row.setBoolean(columnIsDeletedPosition(), true);
-    }
+    return CloseableIterable.transform(task.asDataTask().rows(), row::setStruct);
   }
 }
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
similarity index 95%
rename from spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
rename to spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
index 870be890da..7f15cb28fa 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
@@ -27,10 +27,12 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileScanTask;
@@ -51,7 +53,7 @@ import org.junit.rules.TemporaryFolder;
 import static org.apache.iceberg.FileFormat.PARQUET;
 import static org.apache.iceberg.Files.localOutput;
 
-public class TestSparkBaseDataReader {
+public class TestBaseReader {
 
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
@@ -86,15 +88,20 @@ public class TestSparkBaseDataReader {
 
   // Main reader class to test base class iteration logic.
   // Keeps track of iterator closure.
-  private static class ClosureTrackingReader extends BaseDataReader<Integer> {
+  private static class ClosureTrackingReader extends BaseReader<Integer, FileScanTask> {
     private Map<String, CloseableIntegerRange> tracker = Maps.newHashMap();
 
     ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
-      super(table, new BaseCombinedScanTask(tasks));
+      super(table, new BaseCombinedScanTask(tasks), null, false);
     }
 
     @Override
-    CloseableIterator<Integer> open(FileScanTask task) {
+    protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+      return Stream.of();
+    }
+
+    @Override
+    protected CloseableIterator<Integer> open(FileScanTask task) {
       CloseableIntegerRange intRange = new CloseableIntegerRange(task.file().recordCount());
       tracker.put(getKey(task), intRange);
       return intRange;